Source code for xuance.mindspore.policies.deterministic

import numpy as np
import mindspore.nn as nn
from copy import deepcopy
from gymnasium.spaces import Space, Discrete
from xuance.common import Sequence, Optional, Callable, Union
from xuance.mindspore import ms, ops, Module, Tensor
from xuance.mindspore.utils import ModuleType
from .core import BasicQhead, BasicRecurrent, DuelQhead, C51Qhead, QRDQNhead, ActorNet, CriticNet


[docs] class BasicQnetwork(Module): """ The base class to implement DQN based policy Args: action_space (Discrete): The action space, which type is gym.spaces.Discrete. representation (Module): The representation module. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. use_distributed_training (bool): Whether to use multi-GPU for distributed training. """ def __init__(self, action_space: Discrete, representation: Module, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, use_distributed_training: bool = False): super(BasicQnetwork, self).__init__() self.action_dim = action_space.n self.representation = representation self.target_representation = deepcopy(representation) self.representation_info_shape = self.representation.output_shapes self.eval_Qhead = BasicQhead(self.representation.output_shapes['state'][0], self.action_dim, hidden_size, normalize, initialize, activation) self.target_Qhead = deepcopy(self.eval_Qhead)
[docs] def trainable_params(self, recurse=True): return self.representation.trainable_params() + self.eval_Qhead.trainable_params()
[docs] def construct(self, observation: Tensor): """ Returns the output of the representation, greedy actions, and the evaluated Q-values. Parameters: observation: The original observation input. Returns: outputs: The hidden state output by the representation. argmax_action: The greedy actions. evalQ: The evaluated Q-values. """ outputs = self.representation(observation) evalQ = self.eval_Qhead(outputs) argmax_action = evalQ.argmax(axis=-1) return outputs, argmax_action, evalQ
[docs] def target(self, observation: Tensor): """ Returns the output of the representation, greedy actions, and the evaluated Q-values via target networks. Parameters: observation: The original observation input. Returns: outputs_target: The hidden state output by the representation. argmax_action: The greedy actions from target networks. targetQ: The evaluated Q-values output by target Q-network. """ outputs_target = self.target_representation(observation) targetQ = self.target_Qhead(outputs_target) argmax_action = targetQ.argmax(axis=-1) return outputs_target, argmax_action, targetQ
[docs] def copy_target(self): for ep, tp in zip(self.representation.trainable_params(), self.target_representation.trainable_params()): tp.assign_value(ep) for ep, tp in zip(self.eval_Qhead.trainable_params(), self.target_Qhead.trainable_params()): tp.assign_value(ep)
[docs] class DuelQnetwork(Module): """ The policy for deep dueling Q-networks. Args: action_space (Space): The action space, which type is gym.spaces.Discrete. representation (ModuleType): The representation module. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. use_distributed_training (bool): Whether to use multi-GPU for distributed training. """ def __init__(self, action_space: Space, representation: ModuleType, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, use_distributed_training: bool = False): super(DuelQnetwork, self).__init__() self.action_dim = action_space.n self.representation = representation self.target_representation = deepcopy(representation) self.representation_info_shape = self.representation.output_shapes self.eval_Qhead = DuelQhead(self.representation.output_shapes['state'][0], self.action_dim, hidden_size, normalize, initialize, activation) self.target_Qhead = deepcopy(self.eval_Qhead)
[docs] def construct(self, observation: Tensor): outputs = self.representation(observation) evalQ = self.eval_Qhead(outputs) argmax_action = evalQ.argmax(axis=-1) return outputs, argmax_action, evalQ
[docs] def target(self, observation: Tensor): outputs_target = self.target_representation(observation) targetQ = self.target_Qhead(outputs_target) argmax_action = targetQ.argmax(axis=-1) return outputs_target, argmax_action, targetQ
[docs] def trainable_params(self, recurse=True): return self.representation.trainable_params() + self.eval_Qhead.trainable_params()
[docs] def copy_target(self): for ep, tp in zip(self.representation.trainable_params(), self.target_representation.trainable_params()): tp.assign_value(ep) for ep, tp in zip(self.eval_Qhead.trainable_params(), self.target_Qhead.trainable_params()): tp.assign_value(ep)
[docs] class NoisyQnetwork(Module): """ The policy for noisy deep Q-networks. Args: action_space (Discrete): The action space, which type is gym.spaces.Discrete. representation (Module): The representation module. hidden_size: List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. use_distributed_training (bool): Whether to use multi-GPU for distributed training. """ def __init__(self, action_space: Discrete, representation: ModuleType, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, use_distributed_training: bool = False): super(NoisyQnetwork, self).__init__() self.action_dim = action_space.n self.representation = representation self.target_representation = deepcopy(representation) self.representation_info_shape = self.representation.output_shapes self.eval_Qhead = BasicQhead(self.representation.output_shapes['state'][0], self.action_dim, hidden_size, normalize, initialize, activation) self.target_Qhead = deepcopy(self.eval_Qhead) self._stdnormal = ms.ops.StandardNormal() self._assign = ms.ops.Assign()
[docs] def trainable_params(self, recurse=True): return self.representation.trainable_params() + self.eval_Qhead.trainable_params()
[docs] def update_noise(self, noisy_bound: float = 0.0): """Updates the noises for network parameters.""" self.eval_noise_parameter = [] self.target_noise_parameter = [] for parameter in self.eval_Qhead.trainable_params(): self.eval_noise_parameter.append(self._stdnormal(parameter.shape) * noisy_bound) self.target_noise_parameter.append(self._stdnormal(parameter.shape) * noisy_bound)
[docs] def noisy_parameters(self, is_target=False): self.update_noise(self.noise_scale) if is_target: for parameter, noise_param in zip(self.eval_Qhead.trainable_params(), self.eval_noise_parameter): _ = self._assign(parameter, parameter + noise_param) else: for parameter, noise_param in zip(self.target_Qhead.trainable_params(), self.target_noise_parameter): _ = self._assign(parameter, parameter + noise_param)
[docs] def construct(self, observation: Tensor): """ Returns the output of the representation, greedy actions, and the evaluated Q-values. Parameters: observation: The original observation input. Returns: outputs: The hidden state output by the representation. argmax_action: The greedy actions. evalQ: The evaluated Q-values. """ outputs = self.representation(observation) evalQ = self.eval_Qhead(outputs) argmax_action = evalQ.argmax(axis=-1) return outputs, argmax_action, evalQ
[docs] def target(self, observation: Tensor): """ Returns the output of the representation, greedy actions, and the evaluated Q-values via target networks. Parameters: observation: The original observation input. Returns: outputs_target: The hidden state output by the representation. argmax_action: The greedy actions from target networks. targetQ: The evaluated Q-values output by target Q-network. """ outputs = self.target_representation(observation) self.noisy_parameters(is_target=True) targetQ = self.target_Qhead(outputs) argmax_action = targetQ.argmax(axis=-1) return outputs, argmax_action, targetQ
[docs] def copy_target(self): for ep, tp in zip(self.representation.trainable_params(), self.target_representation.trainable_params()): tp.assign_value(ep) for ep, tp in zip(self.eval_Qhead.trainable_params(), self.target_Qhead.trainable_params()): tp.assign_value(ep)
[docs] class C51Qnetwork(Module): """ The policy for C51 distributional deep Q-networks. Args: action_space (Discrete): The action space, which type is gym.spaces.Discrete. atom_num (int): The number of atoms. v_min (float): The lower bound of value distribution. v_max (float): The upper bound of value distribution. representation (ModuleType): The representation module. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. use_distributed_training (bool): Whether to use multi-GPU for distributed training. """ def __init__(self, action_space: Discrete, atom_num: int, v_min: float, v_max: float, representation: ModuleType, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, use_distributed_training: bool = False): super(C51Qnetwork, self).__init__() self.action_dim = action_space.n self.atom_num = atom_num self.v_min = v_min self.v_max = v_max self.representation = representation self.target_representation = deepcopy(representation) self.representation_info_shape = self.representation.output_shapes self.eval_Zhead = C51Qhead(self.representation.output_shapes['state'][0], self.action_dim, self.atom_num, hidden_size, normalize, initialize, activation) self.target_Zhead = deepcopy(self.eval_Zhead) self._LinSpace = ms.ops.LinSpace() self.supports = ms.Parameter(self._LinSpace(Tensor(self.v_min, ms.float32), Tensor(self.v_max, ms.float32), self.atom_num), requires_grad=False) self.deltaz = (v_max - v_min) / (atom_num - 1)
[docs] def construct(self, observation: Union[np.ndarray, dict]): """ Returns the output of the representation, greedy actions, and the evaluated Z-values. Parameters: observation: The original observation input. Returns: outputs: The hidden state output by the representation. argmax_action: The greedy actions. eval_Z: The evaluated Z-values. """ outputs = self.representation(observation) eval_Z = self.eval_Zhead(outputs) eval_Q = (self.supports * eval_Z).sum(-1) argmax_action = eval_Q.argmax(axis=-1) return outputs, argmax_action, eval_Z
[docs] def target(self, observation: Union[np.ndarray, dict]): """ Returns the output of the representation, greedy actions, and the evaluated Z-values via target networks. Parameters: observation: The original observation input. Returns: outputs_target: The hidden state output by the representation. argmax_action: The greedy actions from target networks. target_Z: The evaluated Z-values output by target Z-network. """ outputs = self.target_representation(observation) target_Z = self.target_Zhead(outputs) target_Q = (self.supports * target_Z).sum(-1) argmax_action = target_Q.argmax(dim=-1) return outputs, argmax_action, target_Z
[docs] def copy_target(self): for ep, tp in zip(self.representation.trainable_params(), self.target_representation.trainable_params()): tp.assign_value(ep) for ep, tp in zip(self.eval_Zhead.trainable_params(), self.target_Zhead.trainable_params()): tp.assign_value(ep)
[docs] class QRDQN_Network(Module): """ The policy for quantile regression deep Q-networks. Args: action_space (Discrete): The action space, which type is gym.spaces.Discrete. quantile_num (int): The number of quantiles. representation (ModuleType): The representation module. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. use_distributed_training (bool): Whether to use multi-GPU for distributed training. """ def __init__(self, action_space: Discrete, quantile_num: int, representation: ModuleType, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, use_distributed_training: bool = False): super(QRDQN_Network, self).__init__() self.action_dim = action_space.n self.quantile_num = quantile_num self.representation = representation self.target_representation = deepcopy(representation) self.representation_info_shape = self.representation.output_shapes self.eval_Zhead = QRDQNhead(self.representation.output_shapes['state'][0], self.action_dim, self.quantile_num, hidden_size, normalize, initialize, activation) self.target_Zhead = deepcopy(self.eval_Zhead) self._mean = ms.ops.ReduceMean()
[docs] def construct(self, observation: Tensor): """ Returns the output of the representation, greedy actions, and the evaluated Z-values. Parameters: observation: The original observation input. Returns: outputs: The hidden state output by the representation. argmax_action: The greedy actions. eval_Z: The evaluated Z-values. """ outputs = self.representation(observation) evalZ = self.eval_Zhead(outputs) evalQ = self._mean(evalZ, -1) argmax_action = evalQ.argmax(axis=-1) return outputs, argmax_action, evalZ
[docs] def target(self, observation: Tensor): """ Returns the output of the representation, greedy actions, and the evaluated Z-values via target networks. Parameters: observation: The original observation input. Returns: outputs_target: The hidden state output by the representation. argmax_action: The greedy actions from target networks. target_Z: The evaluated Z-values output by target Z-network. """ outputs = self.target_representation(observation) target_Z = self.target_Zhead(outputs) target_Q = self._mean(target_Z, -1) argmax_action = target_Q.argmax(axis=-1) return outputs, argmax_action, target_Z
[docs] def trainable_params(self, recurse=True): return self.representation.trainable_params() + self.eval_Zhead.trainable_params()
[docs] def copy_target(self): for ep, tp in zip(self.representation.trainable_params(), self.target_representation.trainable_params()): tp.assign_value(ep) for ep, tp in zip(self.eval_Zhead.trainable_params(), self.target_Zhead.trainable_params()): tp.assign_value(ep)
[docs] class DDPGPolicy(Module): """ The policy of deep deterministic policy gradient. Args: action_space (Space): The action space. representation (Module): The representation module. actor_hidden_size (Sequence[int]): List of hidden units for actor network. critic_hidden_size (Sequence[int]): List of hidden units for critic network. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. activation_action (Optional[ModuleType]): The activation of final layer to bound the actions. """ def __init__(self, action_space: Space, representation: ModuleType, actor_hidden_size: Sequence[int], critic_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, activation_action: Optional[ModuleType] = None): super(DDPGPolicy, self).__init__() self.action_dim = action_space.shape[0] self.representation_info_shape = representation.output_shapes # create networks self.actor_representation = representation self.actor = ActorNet(representation.output_shapes['state'][0], self.action_dim, actor_hidden_size, normalize, initialize, activation, activation_action) self.critic_representation = deepcopy(representation) self.critic = CriticNet(representation.output_shapes['state'][0] + self.action_dim, critic_hidden_size, normalize, initialize, activation) # create target networks self.target_actor_representation = deepcopy(self.actor_representation) self.target_actor = deepcopy(self.actor) self.target_critic_representation = deepcopy(self.critic_representation) self.target_critic = deepcopy(self.critic) # parameters self.actor_parameters = self.actor_representation.trainable_params() + self.actor.trainable_params() self.critic_parameters = self.critic_representation.trainable_params() + self.critic.trainable_params()
[docs] def construct(self, observation: Tensor): """ Returns the output of the actor representations, and the actions. Parameters: observation: The original observation input. Returns: outputs: The output of the actor representations. act: The actions calculated by the actor. """ outputs = self.actor_representation(observation) act = self.actor(outputs) return outputs, act
[docs] def Qtarget(self, observation: Union[np.ndarray, dict]): """Returns the evaluated Q-values via target networks.""" outputs_actor = self.target_actor_representation(observation) outputs_critic = self.target_critic_representation(observation) act = self.target_actor(outputs_actor) q_ = self.target_critic(ops.concat([outputs_critic, act], axis=-1)) return q_
[docs] def Qaction(self, observation: Union[np.ndarray, dict], action: Tensor): """Returns the evaluated Q-values of state-action pairs.""" outputs = self.critic_representation(observation) q = self.critic(ops.concat([outputs, action], axis=-1)) return q
[docs] def Qpolicy(self, observation: Union[np.ndarray, dict]): """Returns the evaluated Q-values by calculating actions via actor networks.""" outputs_actor = self.actor_representation(observation) act = self.actor(outputs_actor) outputs_critic = self.critic_representation(observation) q_eval = self.critic(ops.concat([outputs_critic, act], axis=-1)) return q_eval
[docs] def soft_update(self, tau=0.005): for ep, tp in zip(self.actor_representation.trainable_params(), self.target_actor_representation.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.actor.trainable_params(), self.target_actor.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.critic_representation.trainable_params(), self.target_critic_representation.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.critic.trainable_params(), self.target_critic.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data))
[docs] class TD3Policy(Module): """ The policy of twin delayed deep deterministic policy gradient. Args: action_space (Space): The action space. representation (Module): The representation module. actor_hidden_size (Sequence[int]): List of hidden units for actor network. critic_hidden_size (Sequence[int]): List of hidden units for critic network. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. activation_action (Optional[ModuleType]): The activation of final layer to bound the actions. """ def __init__(self, action_space: Space, representation: ModuleType, actor_hidden_size: Sequence[int], critic_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, activation_action: Optional[ModuleType] = None): super(TD3Policy, self).__init__() self.action_dim = action_space.shape[0] self.representation_info_shape = representation.output_shapes self.actor_representation = representation self.critic_A_representation = deepcopy(representation) self.critic_B_representation = deepcopy(representation) self.target_actor_representation = deepcopy(representation) self.target_critic_A_representation = deepcopy(representation) self.target_critic_B_representation = deepcopy(representation) self.actor = ActorNet(representation.output_shapes['state'][0], self.action_dim, actor_hidden_size, normalize, initialize, activation, activation_action) self.critic_A = CriticNet(representation.output_shapes['state'][0] + self.action_dim, critic_hidden_size, normalize, initialize, activation) self.critic_B = CriticNet(representation.output_shapes['state'][0] + self.action_dim, critic_hidden_size, normalize, initialize, activation) self.target_actor = deepcopy(self.actor) self.target_critic_A = deepcopy(self.critic_A) self.target_critic_B = deepcopy(self.critic_B) # parameters self.actor_parameters = self.actor_representation.trainable_params() + self.actor.trainable_params() self.critic_parameters = self.critic_A_representation.trainable_params() + self.critic_A.trainable_params() + \ self.critic_B_representation.trainable_params() + self.critic_B.trainable_params()
[docs] def construct(self, observation: Union[Tensor, dict]): """ Returns the output of the actor representations, and the actions. Parameters: observation: The original observation input. Returns: outputs: The output of the actor representations. act: The actions calculated by the actor. """ outputs = self.actor_representation(observation) act = self.actor(outputs) return outputs, act
[docs] def Qtarget(self, observation: Union[Tensor, dict]): """Returns the evaluated Q-values via target networks.""" outputs_actor = self.target_actor_representation(observation) outputs_critic_A = self.target_critic_A_representation(observation) outputs_critic_B = self.target_critic_B_representation(observation) act = self.target_actor(outputs_actor) noise = (ops.randn_like(act) * 0.2).clamp(-0.5, 0.5) act = (act + noise).clamp(-1, 1) qa = self.target_critic_A(ops.concat([outputs_critic_A, act], axis=-1)) qb = self.target_critic_B(ops.concat([outputs_critic_B, act], axis=-1)) min_q = ops.minimum(qa, qb) return min_q
[docs] def Qaction(self, observation: Union[Tensor, dict], action: Tensor): """Returns the evaluated Q-values of state-action pairs.""" outputs_critic_A = self.critic_A_representation(observation) outputs_critic_B = self.critic_B_representation(observation) q_eval_a = self.critic_A(ops.concat([outputs_critic_A, action], axis=-1)) q_eval_b = self.critic_B(ops.concat([outputs_critic_B, action], axis=-1)) return q_eval_a, q_eval_b
[docs] def Qpolicy(self, observation: Union[Tensor, dict]): """Returns the evaluated Q-values by calculating actions via actor networks.""" outputs_actor = self.actor_representation(observation) outputs_critic_A = self.critic_A_representation(observation) outputs_critic_B = self.critic_B_representation(observation) act = self.actor(outputs_actor) q_eval_a = self.critic_A(ops.concat([outputs_critic_A, act], axis=-1)).unsqueeze(dim=1) q_eval_b = self.critic_B(ops.concat([outputs_critic_B, act], axis=-1)).unsqueeze(dim=1) return (q_eval_a + q_eval_b) / 2.0
[docs] def soft_update(self, tau=0.005): for ep, tp in zip(self.actor_representation.trainable_params(), self.target_actor_representation.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.actor.trainable_params(), self.target_actor.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.critic_A_representation.trainable_params(), self.target_critic_A_representation.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.critic_A.trainable_params(), self.target_critic_A.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.critic_B_representation.trainable_params(), self.target_critic_B_representation.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.critic_B.trainable_params(), self.target_critic_B.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data))
[docs] class PDQNPolicy(Module): def __init__(self, observation_space, action_space, representation: ModuleType, conactor_hidden_size: Sequence[int], qnetwork_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None): super(PDQNPolicy, self).__init__() self.representation = representation self.observation_space = observation_space self.action_space = action_space self.num_disact = self.action_space.spaces[0].n self.conact_sizes = np.array([self.action_space.spaces[i].shape[0] for i in range(1, self.num_disact + 1)]) self.conact_size = int(self.conact_sizes.sum()) self.qnetwork = BasicQhead(self.observation_space.shape[0] + self.conact_size, self.num_disact, qnetwork_hidden_size, normalize, initialize, nn.ReLU) self.conactor = ActorNet(self.observation_space.shape[0], self.conact_size, conactor_hidden_size, initialize, nn.ReLU) self.target_conactor = deepcopy(self.conactor) self.target_qnetwork = deepcopy(self.qnetwork) self._concat = ms.ops.Concat(1)
[docs] def Atarget(self, state): target_conact = self.target_conactor(state) return target_conact
[docs] def con_action(self, state): state = state.expand_dims(0).astype(ms.float32) conaction = self.conactor(state).squeeze() return conaction
[docs] def Qtarget(self, state, action): input_q = self._concat((state, action)) target_q = self.target_qnetwork(input_q) return target_q
[docs] def Qeval(self, state, action): state = state.astype(ms.float32) input_q = self._concat((state, action)) eval_q = self.qnetwork(input_q) return eval_q
[docs] def Qpolicy(self, state): conact = self.conactor(state) input_q = self._concat((state, conact)) policy_q = (self.qnetwork(input_q)).sum() return policy_q
[docs] def construct(self): return super().construct()
[docs] def soft_update(self, tau=0.005): for ep, tp in zip(self.conactor.trainable_params(), self.target_conactor.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.qnetwork.trainable_params(), self.target_qnetwork.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data))
[docs] class MPDQNPolicy(Module): def __init__(self, observation_space, action_space, representation: ModuleType, conactor_hidden_size: Sequence[int], qnetwork_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None): super(MPDQNPolicy, self).__init__() self.representation = representation self.observation_space = observation_space self.obs_size = self.observation_space.shape[0] self.action_space = action_space self.num_disact = self.action_space.spaces[0].n self.conact_sizes = np.array([self.action_space.spaces[i].shape[0] for i in range(1, self.num_disact + 1)]) self.conact_size = int(self.conact_sizes.sum()) self.qnetwork = BasicQhead(self.observation_space.shape[0] + self.conact_size, self.num_disact, qnetwork_hidden_size, normalize, initialize, nn.ReLU) self.conactor = ActorNet(self.observation_space.shape[0], self.conact_size, conactor_hidden_size, initialize, nn.ReLU) self.target_conactor = deepcopy(self.conactor) self.target_qnetwork = deepcopy(self.qnetwork) self.offsets = self.conact_sizes.cumsum() self.offsets = np.insert(self.offsets, 0, 0) self.offsets = Tensor(self.offsets) self._concat = ms.ops.Concat(1) self._zeroslike = ms.ops.ZerosLike() self._squeeze = ms.ops.Squeeze(1)
[docs] def Atarget(self, state): target_conact = self.target_conactor(state) return target_conact
[docs] def con_action(self, state): # conaction = self.conactor(state) state = state.expand_dims(0).astype(ms.float32) conaction = self.conactor(state).squeeze() return conaction
[docs] def Qtarget(self, state, action): batch_size = state.shape[0] Q = [] input_q = self._concat((state, self._zeroslike(action))) input_q = input_q.repeat(self.num_disact, 0) input_q = input_q.asnumpy() action = action.asnumpy() for i in range(self.num_disact): input_q[i * batch_size:(i + 1) * batch_size, self.obs_size + self.offsets[i]: self.obs_size + self.offsets[i + 1]] \ = action[:, self.offsets[i]:self.offsets[i + 1]] input_q = Tensor(input_q, dtype=ms.float32) eval_qall = self.target_qnetwork(input_q) for i in range(self.num_disact): eval_q = eval_qall[i * batch_size:(i + 1) * batch_size, i] if len(eval_q.shape) == 1: eval_q = eval_q.expand_dims(1) Q.append(eval_q) Q = self._concat(Q) return Q
[docs] def Qeval(self, state, action, input_q): # state = state.astype(ms.float32) batch_size = state.shape[0] Q = [] # input_q = self._concat((state, self._zeroslike(action))) # input_q = input_q.repeat(self.num_disact, 0) # input_q = input_q.asnumpy() # action = action.asnumpy() # for i in range(self.num_disact): # input_q[i * batch_size:(i + 1) * batch_size, self.obs_size + self.offsets[i]: self.obs_size + self.offsets[i + 1]] \ # = action[:, self.offsets[i]:self.offsets[i + 1]] # # = self._squeeze(action[:, self.offsets[i]:self.offsets[i + 1]]) # input_q = Tensor(input_q, dtype=ms.float32) eval_qall = self.qnetwork(input_q) for i in range(self.num_disact): eval_q = eval_qall[i * batch_size:(i + 1) * batch_size, i] if len(eval_q.shape) == 1: eval_q = eval_q.expand_dims(1) Q.append(eval_q) Q = self._concat(Q) return Q
[docs] def Qpolicy(self, state, input_q): # conact = self.conactor(state) batch_size = state.shape[0] Q = [] # input_q = self._concat((state, self._zeroslike(conact))) # input_q = input_q.repeat(self.num_disact, 0) # for i in range(self.num_disact): # input_q[i * batch_size:(i + 1) * batch_size, # self.obs_size + self.offsets[i]: self.obs_size + self.offsets[i + 1]] \ # = conact[:, self.offsets[i]:self.offsets[i + 1]] eval_qall = self.qnetwork(input_q) for i in range(self.num_disact): eval_q = eval_qall[i * batch_size:(i + 1) * batch_size, i] if len(eval_q.shape) == 1: eval_q = eval_q.expand_dims(1) Q.append(eval_q) Q = self._concat(Q) return Q
[docs] def construct(self): return super().construct()
[docs] def soft_update(self, tau=0.005): for ep, tp in zip(self.conactor.trainable_params(), self.target_conactor.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.qnetwork.trainable_params(), self.target_qnetwork.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data))
[docs] class SPDQNPolicy(Module): def __init__(self, observation_space, action_space, representation: ModuleType, conactor_hidden_size: Sequence[int], qnetwork_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None): super(SPDQNPolicy, self).__init__() self.representation = representation self.observation_space = observation_space self.obs_size = self.observation_space.shape[0] self.action_space = action_space self.num_disact = self.action_space.spaces[0].n self.conact_sizes = np.array([self.action_space.spaces[i].shape[0] for i in range(1, self.num_disact + 1)]) self.conact_size = int(self.conact_sizes.sum()) self.qnetwork = BasicQhead(self.observation_space.shape[0] + self.conact_size, self.num_disact, qnetwork_hidden_size, normalize, initialize, nn.ReLU) self.conactor = ActorNet(self.observation_space.shape[0], self.conact_size, conactor_hidden_size, initialize, nn.ReLU) self.target_conactor = deepcopy(self.conactor) self.target_qnetwork = deepcopy(self.qnetwork) self.offsets = self.conact_sizes.cumsum() self.offsets = np.insert(self.offsets, 0, 0) self.offsets = Tensor(self.offsets) self._concat = ms.ops.Concat(1) self._zeroslike = ms.ops.ZerosLike() self._squeeze = ms.ops.Squeeze(1)
[docs] def Atarget(self, state): target_conact = self.target_conactor(state) return target_conact
[docs] def con_action(self, state): state = state.expand_dims(0).astype(ms.float32) conaction = self.conactor(state).squeeze() return conaction
[docs] def Qtarget(self, state, action): batch_size = state.shape[0] Q = [] input_q = self._concat((state, self._zeroslike(action))) input_q = input_q.repeat(self.num_disact, 0) input_q = input_q.asnumpy() action = action.asnumpy() for i in range(self.num_disact): input_q[i * batch_size:(i + 1) * batch_size, self.obs_size + self.offsets[i]: self.obs_size + self.offsets[i + 1]] \ = action[:, self.offsets[i]:self.offsets[i + 1]] input_q = Tensor(input_q, dtype=ms.float32) eval_qall = self.target_qnetwork(input_q) for i in range(self.num_disact): eval_q = eval_qall[i * batch_size:(i + 1) * batch_size, i] if len(eval_q.shape) == 1: eval_q = eval_q.expand_dims(1) Q.append(eval_q) Q = self._concat(Q) return Q
[docs] def Qeval(self, state, action, input_q): batch_size = state.shape[0] Q = [] eval_qall = self.qnetwork(input_q) for i in range(self.num_disact): eval_q = eval_qall[i * batch_size:(i + 1) * batch_size, i] if len(eval_q.shape) == 1: eval_q = eval_q.expand_dims(1) Q.append(eval_q) Q = self._concat(Q) return Q
[docs] def Qpolicy(self, state, input_q): batch_size = state.shape[0] Q = [] eval_qall = self.qnetwork(input_q) for i in range(self.num_disact): eval_q = eval_qall[i * batch_size:(i + 1) * batch_size, i] if len(eval_q.shape) == 1: eval_q = eval_q.expand_dims(1) Q.append(eval_q) Q = self._concat(Q) return Q
[docs] def construct(self): return super().construct()
[docs] def soft_update(self, tau=0.005): for ep, tp in zip(self.conactor.trainable_params(), self.target_conactor.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data)) for ep, tp in zip(self.qnetwork.trainable_params(), self.target_qnetwork.trainable_params()): tp.assign_value((tau * ep.data + (1 - tau) * tp.data))
[docs] class DRQNPolicy(Module): def __init__(self, action_space: Discrete, representation: Module, **kwargs): super(DRQNPolicy, self).__init__() self.recurrent_layer_N = kwargs['recurrent_layer_N'] self.rnn_hidden_dim = kwargs['recurrent_hidden_size'] self.action_dim = action_space.n self.representation = representation self.target_representation = deepcopy(representation) self.representation_info_shape = self.representation.output_shapes kwargs["input_dim"] = self.representation.output_shapes['state'][0] kwargs["action_dim"] = self.action_dim self.lstm = True if kwargs["rnn"] == "LSTM" else False self.cnn = True if self.representation.cls_name == "Basic_CNN" else False self.eval_Qhead = BasicRecurrent(**kwargs) self.target_Qhead = deepcopy(self.eval_Qhead) self._zeroslike = ms.ops.ZerosLike()
[docs] def construct(self, observation: Tensor, *rnn_hidden: Tensor): if self.cnn: obs_shape = observation.shape outputs = self.representation(observation.reshape((-1,) + obs_shape[-3:])) outputs = outputs.reshape(obs_shape[0:-3] + (-1,)) else: observation_shape = observation.shape outputs = self.representation(observation.reshape([-1, observation_shape[-1]])) outputs = outputs.reshape(observation_shape[:-1] + (-1, )) if self.lstm: hidden_states, cell_states, evalQ = self.eval_Qhead(outputs, rnn_hidden[0], rnn_hidden[1]) else: hidden_states, evalQ = self.eval_Qhead(outputs, rnn_hidden[0]) cell_states = None argmax_action = evalQ[:, -1].argmax(axis=-1) return outputs, argmax_action, evalQ, (hidden_states, cell_states)
[docs] def target(self, observation: Union[np.ndarray, dict], *rnn_hidden: Tensor): if self.cnn: obs_shape = observation.shape outputs = self.representation(observation.reshape((-1,) + obs_shape[-3:])) outputs = outputs.reshape(obs_shape[0:-3] + (-1,)) else: outputs = self.representation(observation) if self.lstm: hidden_states, cell_states, targetQ = self.target_Qhead(outputs, rnn_hidden[0], rnn_hidden[1]) else: hidden_states, targetQ = self.target_Qhead(outputs, rnn_hidden[0]) cell_states = None argmax_action = targetQ.argmax(axis=-1) return outputs, argmax_action, targetQ, (hidden_states, cell_states)
[docs] def init_hidden(self, batch): hidden_states = ms.ops.zeros(size=(self.recurrent_layer_N, batch, self.rnn_hidden_dim)) cell_states = self._zeroslike(hidden_states) if self.lstm else None return hidden_states, cell_states
[docs] def init_hidden_item(self, rnn_hidden, i): if self.lstm: rnn_hidden[0][:, i] = ms.ops.zeros(size=(self.recurrent_layer_N, self.rnn_hidden_dim)) rnn_hidden[1][:, i] = ms.ops.zeros(size=(self.recurrent_layer_N, self.rnn_hidden_dim)) return rnn_hidden else: rnn_hidden[:, i] = ms.ops.zeros(size=(self.recurrent_layer_N, self.rnn_hidden_dim)) return rnn_hidden
[docs] def copy_target(self): for ep, tp in zip(self.representation.trainable_params(), self.target_representation.trainable_params()): tp.assign_value(ep) for ep, tp in zip(self.eval_Qhead.trainable_params(), self.target_Qhead.trainable_params()): tp.assign_value(ep)