Source code for xuance.torch.policies.deterministic_marl

import os
from operator import itemgetter
import torch
from torch.distributions import Categorical
from torch.nn.functional import one_hot
from copy import deepcopy
from gymnasium.spaces import Discrete, Box
from xuance.common import Sequence, Optional, Callable, Union, Dict, List
from xuance.torch.policies import BasicQhead, ActorNet, CriticNet, VDN_mixer, QMIX_FF_mixer
from xuance.torch.representations import Basic_MLP
from xuance.torch.utils import ModuleType
from xuance.torch import Tensor, Module, ModuleDict, DistributedDataParallel


[docs] class BasicQnetwork(Module): """ The base class to implement DQN based policy Args: action_space (Optional[Dict[str, Discrete]]): The action space, which type is gym.spaces.Discrete. n_agents (int): The number of agents. representation (ModuleDict): A dict of the representation module for all agents. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters' initializer. activation (Optional[ModuleType]): The activation function for each layer. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Optional[Dict[str, Discrete]], n_agents: int, representation: ModuleDict, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): super(BasicQnetwork, self).__init__() self.device = device self.action_space = action_space self.n_agents = n_agents self.use_parameter_sharing = kwargs['use_parameter_sharing'] self.model_keys = kwargs['model_keys'] self.representation_info_shape = {key: representation[key].output_shapes for key in self.model_keys} self.lstm = True if kwargs["rnn"] == "LSTM" else False self.use_rnn = True if kwargs["use_rnn"] else False self.representation = representation self.target_representation = deepcopy(self.representation) self.dim_input_Q, self.n_actions = {}, {} self.eval_Qhead, self.target_Qhead = ModuleDict(), ModuleDict() for key in self.model_keys: self.n_actions[key] = self.action_space[key].n self.dim_input_Q[key] = self.representation_info_shape[key]['state'][0] if self.use_parameter_sharing: self.dim_input_Q[key] += self.n_agents self.eval_Qhead[key] = BasicQhead(self.dim_input_Q[key], self.n_actions[key], hidden_size, normalize, initialize, activation, device) self.target_Qhead[key] = deepcopy(self.eval_Qhead[key]) # Prepare DDP module. self.distributed_training = use_distributed_training if self.distributed_training: self.rank = int(os.environ["RANK"]) for key in self.model_keys: if self.representation[key]._get_name() != "Basic_Identical": self.representation[key] = DistributedDataParallel(module=self.representation[key], device_ids=[self.rank]) self.eval_Qhead[key] = DistributedDataParallel(module=self.eval_Qhead[key], device_ids=[self.rank]) @property def parameters_model(self): parameters_model = {} for key in self.model_keys: parameters_model[key] = list(self.representation[key].parameters()) + list( self.eval_Qhead[key].parameters()) return parameters_model
[docs] def forward(self, observation: Dict[str, Tensor], agent_ids: Tensor = None, avail_actions: Dict[str, Tensor] = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns actions of the policy. Parameters: observation (Dict[Tensor]): The input observations for the policies. agent_ids (Tensor): The agents' ids (for parameter sharing). avail_actions (Dict[str, Tensor]): Actions mask values, default is None. agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. argmax_action (Dict[str, Tensor]): The actions output by the policies. evalQ (Dict[str, Tensor]): The evaluations of observation-action pairs. """ rnn_hidden_new, argmax_action, evalQ = {}, {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] if avail_actions is not None: avail_actions = {key: Tensor(avail_actions[key]) for key in agent_list} for key in agent_list: if self.use_rnn: outputs = self.representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: outputs = self.representation[key](observation[key]) rnn_hidden_new[key] = [None, None] if self.use_parameter_sharing: q_inputs = torch.concat([outputs['state'], agent_ids], dim=-1) else: q_inputs = outputs['state'] evalQ[key] = self.eval_Qhead[key](q_inputs) if avail_actions is not None: evalQ_detach = evalQ[key].clone().detach() evalQ_detach[avail_actions[key] == 0] = -1e10 argmax_action[key] = evalQ_detach.argmax(dim=-1, keepdim=False) else: argmax_action[key] = evalQ[key].argmax(dim=-1, keepdim=False) return rnn_hidden_new, argmax_action, evalQ
[docs] def Qtarget(self, observation: Dict[str, Tensor], agent_ids: Dict[str, Tensor], agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the Q^target of next observations and actions pairs. Parameters: observation (Dict[Tensor]): The observations. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_target: The evaluations of Q^target. """ rnn_hidden_new, q_target = {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs = self.target_representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: outputs = self.target_representation[key](observation[key]) rnn_hidden_new[key] = None if self.use_parameter_sharing: q_inputs = torch.concat([outputs['state'], agent_ids], dim=-1) else: q_inputs = outputs['state'] q_target[key] = self.target_Qhead[key](q_inputs) return rnn_hidden_new, q_target
[docs] def copy_target(self): for ep, tp in zip(self.representation.parameters(), self.target_representation.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.eval_Qhead.parameters(), self.target_Qhead.parameters()): tp.data.copy_(ep)
[docs] class MixingQnetwork(BasicQnetwork): """ The base class to implement value-decomposition based policy. Args: action_space (Optional[Dict[str, Discrete]]): The action space, which type is gym.spaces.Discrete. n_agents (int): The number of agents. representation (ModuleDict): A dict of the representation module for all agents. mixer (Module): The mixer module that mix together the individual values to the total value. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Optional[Dict[str, Discrete]], n_agents: int, representation: ModuleDict, mixer: Optional[VDN_mixer] = None, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): super(MixingQnetwork, self).__init__(action_space, n_agents, representation, hidden_size, normalize, initialize, activation, device, use_distributed_training, **kwargs) self.eval_Qtot = mixer self.target_Qtot = deepcopy(self.eval_Qtot) if self.distributed_training: self.eval_Qtot = DistributedDataParallel(module=self.eval_Qtot, device_ids=[self.rank]) @property def parameters_model(self): parameters_model = list(self.eval_Qtot.parameters()) + list(self.representation.parameters()) + list( self.eval_Qhead.parameters()) return parameters_model
[docs] def Q_tot(self, individual_values: Dict[str, Tensor], states: Optional[Tensor] = None): """ Returns the total Q values. Parameters: individual_values (Dict[str, Tensor]): The individual Q values of all agents. states (Optional[Tensor]): The global states if necessary, default is None. Returns: evalQ_tot (Tensor): The evaluated total Q values for the multi-agent team. """ if self.use_parameter_sharing: """ From dict to tensor. For example: individual_values: {'agent_0': batch * n_agents * 1} -> individual_inputs: batch * n_agents * 1 """ individual_inputs = individual_values[self.model_keys[0]].reshape([-1, self.n_agents, 1]) else: """ From dict to tensor. For example: individual_values: {'agent_0': batch * 1, 'agent_1': batch * 1, 'agent_2': batch * 1} -> individual_inputs: batch * 2 * 1 """ individual_inputs = torch.concat([individual_values[k] for k in self.model_keys], dim=-1).reshape([-1, self.n_agents, 1]) evalQ_tot = self.eval_Qtot(individual_inputs, states) return evalQ_tot
[docs] def Qtarget_tot(self, individual_values: Dict[str, Tensor], states: Optional[Tensor] = None): """ Returns the total Q values with target networks. Parameters: individual_values (Dict[str, Tensor]): The individual Q values of all agents. states (Optional[Tensor]): The global states if necessary, default is None. (Shape: batch * dim_state) Returns: q_target_tot (Tensor): The evaluated total Q values calculated by target networks. """ if self.use_parameter_sharing: """ From dict to tensor. For example: individual_values: {'agent_0': batch * n_agents * 1} -> individual_inputs: batch * n_agents * 1 """ individual_inputs = individual_values[self.model_keys[0]].reshape([-1, self.n_agents, 1]) else: """ From dict to tensor. For example: individual_values: {'agent_0': batch * 1, 'agent_1': batch * 1, 'agent_2': batch * 1} -> individual_inputs: batch * 2 * 1 """ individual_inputs = torch.concat([individual_values[k] for k in self.model_keys], dim=-1).reshape([-1, self.n_agents, 1]) q_target_tot = self.target_Qtot(individual_inputs, states) return q_target_tot
[docs] def copy_target(self): for ep, tp in zip(self.representation.parameters(), self.target_representation.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.eval_Qhead.parameters(), self.target_Qhead.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.eval_Qtot.parameters(), self.target_Qtot.parameters()): tp.data.copy_(ep)
[docs] class Weighted_MixingQnetwork(MixingQnetwork): """ The base class to implement weighted value-decomposition based policy. Args: action_space (Optional[Dict[str, Discrete]]): The action space, which type is gym.spaces.Discrete. n_agents (int): The number of agents. representation (ModuleDict): A dict of the representation module for all agents. mixer (Module): The mixer module that mix together the individual values to the total value. ff_mixer (Module): The feedforward mixer module that mix together the individual values to the total value. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Optional[Dict[str, Discrete]], n_agents: int, representation: ModuleDict, mixer: Optional[VDN_mixer] = None, ff_mixer: Optional[QMIX_FF_mixer] = None, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): super(Weighted_MixingQnetwork, self).__init__(action_space, n_agents, representation, mixer, hidden_size, normalize, initialize, activation, device, use_distributed_training, **kwargs) self.eval_Qhead_centralized = deepcopy(self.eval_Qhead) self.target_Qhead_centralized = deepcopy(self.eval_Qhead_centralized) self.ff_mixer = ff_mixer self.target_ff_mixer = deepcopy(self.ff_mixer) if self.distributed_training: for key in self.model_keys: self.eval_Qhead_centralized[key] = DistributedDataParallel(module=self.eval_Qhead_centralized[key], device_ids=[self.rank])[key] self.ff_mixer = DistributedDataParallel(module=self.ff_mixer, device_ids=[self.rank]) @property def parameters_model(self): parameters_model = list(self.eval_Qtot.parameters()) + list(self.ff_mixer.parameters()) + list( self.representation.parameters()) + list(self.eval_Qhead.parameters()) + list( self.eval_Qhead_centralized.parameters()) return parameters_model
[docs] def q_centralized(self, observation: Dict[str, Tensor], agent_ids: Dict[str, Tensor], agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the centralised Q value. Parameters: observation (Dict[Tensor]): The observations. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. evalQ_cent (Tensor): The evaluated centralised Q values. """ rnn_hidden_new, argmax_action, evalQ_cent = {}, {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs = self.representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: outputs = self.representation[key](observation[key]) rnn_hidden_new[key] = [None, None] if self.use_parameter_sharing: q_inputs = torch.concat([outputs['state'], agent_ids], dim=-1) else: q_inputs = outputs['state'] evalQ_cent[key] = self.eval_Qhead_centralized[key](q_inputs) return rnn_hidden_new, evalQ_cent
[docs] def target_q_centralized(self, observation: Dict[str, Tensor], agent_ids: Dict[str, Tensor], agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the centralised Q value with target networks. Parameters: observation (Dict[Tensor]): The observations. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_target_cent (Tensor): The evaluated centralised Q values with target networks. """ rnn_hidden_new, q_target_cent = {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs = self.target_representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: outputs = self.target_representation[key](observation[key]) rnn_hidden_new[key] = [None, None] if self.use_parameter_sharing: q_inputs = torch.concat([outputs['state'], agent_ids], dim=-1) else: q_inputs = outputs['state'] q_target_cent[key] = self.target_Qhead_centralized[key](q_inputs) return rnn_hidden_new, q_target_cent
[docs] def q_feedforward(self, individual_values: Dict[str, Tensor], states: Optional[Tensor] = None): """ Returns the total Q values with feedforward mixer networks. Parameters: individual_values (Dict[str, Tensor]): The individual Q values of all agents. states (Optional[Tensor]): The global states if necessary, default is None. Returns: evalQ_tot (Tensor): The evaluated total Q values for the multi-agent team. """ if self.use_parameter_sharing: """ From dict to tensor. For example: individual_values: {'agent_0': batch * n_agents * 1} -> individual_inputs: batch * n_agents * 1 """ individual_inputs = individual_values[self.model_keys[0]].reshape([-1, self.n_agents, 1]) else: """ From dict to tensor. For example: individual_values: {'agent_0': batch * 1, 'agent_1': batch * 1, 'agent_2': batch * 1} -> individual_inputs: batch * 2 * 1 """ individual_inputs = torch.concat([individual_values[k] for k in self.model_keys], dim=-1).reshape([-1, self.n_agents, 1]) evalQ_tot = self.ff_mixer(individual_inputs, states) return evalQ_tot
[docs] def target_q_feedforward(self, individual_values: Dict[str, Tensor], states: Optional[Tensor] = None): """ Returns the total Q values with target feedforward mixer networks. Parameters: individual_values (Dict[str, Tensor]): The individual Q values of all agents. states (Optional[Tensor]): The global states if necessary, default is None. Returns: q_target_tot (Tensor): The evaluated total Q values for the multi-agent team. """ if self.use_parameter_sharing: """ From dict to tensor. For example: individual_values: {'agent_0': batch * n_agents * 1} -> individual_inputs: batch * n_agents * 1 """ individual_inputs = individual_values[self.model_keys[0]].reshape([-1, self.n_agents, 1]) else: """ From dict to tensor. For example: individual_values: {'agent_0': batch * 1, 'agent_1': batch * 1, 'agent_2': batch * 1} -> individual_inputs: batch * 2 * 1 """ individual_inputs = torch.concat([individual_values[k] for k in self.model_keys], dim=-1).reshape([-1, self.n_agents, 1]) q_target_tot = self.target_ff_mixer(individual_inputs, states) return q_target_tot
[docs] def copy_target(self): for ep, tp in zip(self.representation.parameters(), self.target_representation.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.eval_Qhead.parameters(), self.target_Qhead.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.eval_Qhead_centralized.parameters(), self.target_Qhead_centralized.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.eval_Qtot.parameters(), self.target_Qtot.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.ff_mixer.parameters(), self.target_ff_mixer.parameters()): tp.data.copy_(ep)
[docs] class Qtran_MixingQnetwork(BasicQnetwork): """ The base class to implement weighted value-decomposition based policy. Args: action_space (Optional[Dict[str, Discrete]]): The action space, which type is gym.spaces.Discrete. n_agents (int): The number of agents. representation (ModuleDict): A dict of the representation module for all agents. mixer (Module): The mixer module that mix together the individual values to the total value. qtran_mixer (Module): The feedforward mixer module that mix together the individual values to the total value. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Optional[Dict[str, Discrete]], n_agents: int, representation: ModuleDict, mixer: Optional[VDN_mixer] = None, qtran_mixer: Module = None, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): super(Qtran_MixingQnetwork, self).__init__(action_space, n_agents, representation, hidden_size, normalize, initialize, activation, device, use_distributed_training, **kwargs) self.n_actions_list = [a_space.n for a_space in action_space.values()] self.n_actions_max = max(self.n_actions_list) self.qtran_net = qtran_mixer self.target_qtran_net = deepcopy(qtran_mixer) self.q_tot = mixer if self.distributed_training: self.qtran_net = DistributedDataParallel(module=self.qtran_net, device_ids=[self.rank]) self.q_tot = DistributedDataParallel(module=self.q_tot, device_ids=[self.rank]) @property def parameters_model(self): parameters_model = list(self.qtran_net.parameters()) + list(self.q_tot.parameters()) + \ list(self.representation.parameters()) + list(self.eval_Qhead.parameters()) return parameters_model
[docs] def forward(self, observation: Dict[str, Tensor], agent_ids: Tensor = None, avail_actions: Dict[str, Tensor] = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns actions of the policy. Parameters: observation (Dict[Tensor]): The input observations for the policies. agent_ids (Tensor): The agents' ids (for parameter sharing). avail_actions (Dict[str, Tensor]): Actions mask values, default is None. agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. rep_hidden_state (Dict[str, Tensor]): The hidden states. argmax_action (Dict[str, Tensor]): The actions output by the policies. evalQ (Dict[str, Tensor]): The evaluations of observation-action pairs. """ rnn_hidden_new, argmax_action, evalQ = {}, {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] rep_hidden_state = {} if avail_actions is not None: avail_actions = {key: Tensor(avail_actions[key]) for key in agent_list} for key in agent_list: if self.use_rnn: outputs = self.representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: outputs = self.representation[key](observation[key]) rnn_hidden_new[key] = [None, None] if self.use_parameter_sharing: q_inputs = torch.concat([outputs['state'], agent_ids], dim=-1) else: q_inputs = outputs['state'] rep_hidden_state[key] = outputs['state'] evalQ[key] = self.eval_Qhead[key](q_inputs) if avail_actions is not None: evalQ_detach = evalQ[key].clone().detach() evalQ_detach[avail_actions[key] == 0] = -1e10 argmax_action[key] = evalQ_detach.argmax(dim=-1, keepdim=False) else: argmax_action[key] = evalQ[key].argmax(dim=-1, keepdim=False) return rnn_hidden_new, rep_hidden_state, argmax_action, evalQ
[docs] def Qtarget(self, observation: Dict[str, Tensor], agent_ids: Dict[str, Tensor], agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the Q^target of next observations and actions pairs. Parameters: observation (Dict[Tensor]): The observations. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. rep_hidden_state (Dict[str, Tensor]): The hidden states. q_target: The evaluations of Q^target. """ rnn_hidden_new, q_target, rep_hidden_state = {}, {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs = self.target_representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: outputs = self.target_representation[key](observation[key]) rnn_hidden_new[key] = [None, None] if self.use_parameter_sharing: q_inputs = torch.concat([outputs['state'], agent_ids], dim=-1) else: q_inputs = outputs['state'] rep_hidden_state[key] = outputs['state'] q_target[key] = self.target_Qhead[key](q_inputs) return rnn_hidden_new, rep_hidden_state, q_target
[docs] def Q_tot(self, individual_values: Dict[str, Tensor], states: Optional[Tensor] = None): """ Returns the total Q values. Parameters: individual_values (Dict[str, Tensor]): The individual Q values of all agents. states (Optional[Tensor]): The global states if necessary, default is None. Returns: evalQ_tot (Tensor): The evaluated total Q values for the multi-agent team. """ if self.use_parameter_sharing: """ From dict to tensor. For example: individual_values: {'agent_0': batch * n_agents * 1} -> individual_inputs: batch * n_agents * 1 """ individual_inputs = individual_values[self.model_keys[0]].reshape([-1, self.n_agents, 1]) else: """ From dict to tensor. For example: individual_values: {'agent_0': batch * 1, 'agent_1': batch * 1, 'agent_2': batch * 1} -> individual_inputs: batch * 2 * 1 """ individual_inputs = torch.concat([individual_values[k] for k in self.model_keys], dim=-1).reshape([-1, self.n_agents, 1]) eval_Q_tot = self.q_tot(individual_inputs, states) return eval_Q_tot
[docs] def Q_tran(self, states: Tensor, hidden_states: Dict[str, Tensor], actions: Dict[str, Tensor], agent_mask: Dict[str, Tensor] = None, avail_actions: Dict[str, Tensor] = None): """ Returns the total Q values. Parameters: states (Tensor): The global states. hidden_states (Dict[str, Tensor]): The hidden states. actions (Dict[str, Tensor]): The executed actions. agent_mask (Dict[str, Tensor]): Agent mask values, default is None. avail_actions (Dict[str, Tensor]): Actions mask values, default is None. Returns: q_jt (Tensor): The evaluated joint Q values. v_jt (Tensor): The evaluated joint V values. """ seq_len = states.shape[1] if self.use_rnn else 1 batch_size = states.shape[0] if self.use_parameter_sharing: key = self.model_keys[0] dim_hidden_state = hidden_states[key].shape[-1] actions_onehot = one_hot(actions[key].long(), self.action_space[key].n) if self.use_rnn: actions_onehot = actions_onehot.reshape(batch_size, self.n_agents, seq_len, -1) hidden_states_input = hidden_states[key].reshape([-1, self.n_agents, seq_len, dim_hidden_state]) else: actions_onehot = actions_onehot.reshape(batch_size, self.n_agents, -1) hidden_states_input = hidden_states[key].reshape([-1, self.n_agents, dim_hidden_state]) if avail_actions is not None: actions_onehot *= avail_actions[key] if agent_mask is not None: if self.use_rnn: agent_mask = agent_mask[key].reshape( batch_size, self.n_agents, seq_len, 1).repeat(1, 1, 1, dim_hidden_state) else: agent_mask = agent_mask[key].reshape(batch_size, self.n_agents, 1).repeat(1, 1, dim_hidden_state) hidden_states_input = hidden_states_input * agent_mask if self.use_rnn: states = states.reshape(batch_size * seq_len, -1) hidden_states_input = hidden_states_input.transpose(1, 2).reshape(-1, self.n_agents, dim_hidden_state) actions_onehot = actions_onehot.transpose(1, 2).reshape(-1, self.n_agents, self.n_actions_max) else: hidden_states_input = torch.cat([hidden_states[k].unsqueeze(1) for k in self.model_keys], dim=1) actions_onehot = torch.cat([one_hot(actions[k].long(), self.n_actions_max).unsqueeze(1) for k in self.model_keys], dim=1) q_jt, v_jt = self.qtran_net(states, hidden_states_input, actions_onehot) return q_jt, v_jt
[docs] def Q_tran_target(self, states: Tensor, hidden_states: Dict[str, Tensor], actions: Dict[str, Tensor], agent_mask: Dict[str, Tensor] = None, avail_actions: Dict[str, Tensor] = None): """ Returns the total Q values. Parameters: states (Tensor): The global states. hidden_states (Dict[str, Tensor]): The hidden states. actions (Dict[str, Tensor]): The executed actions. agent_mask (Dict[str, Tensor]): Agent mask values, default is None. avail_actions (Dict[str, Tensor]): Actions mask values, default is None. Returns: q_jt (Tensor): The evaluated joint Q values. v_jt (Tensor): The evaluated joint V values. """ seq_len = states.shape[1] if self.use_rnn else 1 batch_size = states.shape[0] if self.use_parameter_sharing: key = self.model_keys[0] dim_hidden_state = hidden_states[key].shape[-1] actions_onehot = one_hot(actions[key].long(), self.action_space[key].n) if self.use_rnn: actions_onehot = actions_onehot.reshape(batch_size, self.n_agents, seq_len, -1) hidden_states_input = hidden_states[key].reshape([-1, self.n_agents, seq_len, dim_hidden_state]) else: actions_onehot = actions_onehot.reshape(batch_size, self.n_agents, -1) hidden_states_input = hidden_states[key].reshape([-1, self.n_agents, dim_hidden_state]) if avail_actions is not None: actions_onehot *= avail_actions[key] if agent_mask is not None: if self.use_rnn: agent_mask = agent_mask[key].reshape( batch_size, self.n_agents, seq_len, 1).repeat(1, 1, 1, dim_hidden_state) else: agent_mask = agent_mask[key].reshape(batch_size, self.n_agents, 1).repeat(1, 1, dim_hidden_state) hidden_states_input = hidden_states_input * agent_mask if self.use_rnn: states = states.reshape(batch_size * seq_len, -1) hidden_states_input = hidden_states_input.transpose(1, 2).reshape(-1, self.n_agents, dim_hidden_state) actions_onehot = actions_onehot.transpose(1, 2).reshape(-1, self.n_agents, self.n_actions_max) else: hidden_states_input = torch.cat([hidden_states[k].unsqueeze(1) for k in self.model_keys], dim=1) actions_onehot = torch.cat([one_hot(actions[k].long(), self.n_actions_max).unsqueeze(1) for k in self.model_keys], dim=1) q_jt, v_jt = self.target_qtran_net(states, hidden_states_input, actions_onehot) return q_jt, v_jt
[docs] def copy_target(self): for ep, tp in zip(self.representation.parameters(), self.target_representation.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.eval_Qhead.parameters(), self.target_Qhead.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.qtran_net.parameters(), self.target_qtran_net.parameters()): tp.data.copy_(ep)
[docs] class DCG_policy(Module): """ The deep coordination graph policy. Args: action_space (Optional[Dict[str, Discrete]]): The action space, which type is gym.spaces.Discrete. n_agents(int): The number of agents. representation (ModuleDict): A dict of the representation module for all agents. utility (Module): The utility module that outputs an agent's utility value. payoffs (Module): The payoff module that outputs two agents' payoff value. dcgraph (Module): The deep coordination graph module. hidden_size_bias (Sequence[int]): List of hidden units for fully connect layers of bias net. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation(Optional[ModuleType]): The activation function for each layer. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Discrete, n_agents: int, representation: Module, utility: Optional[Module] = None, payoffs: Optional[Module] = None, dcgraph: Optional[Module] = None, hidden_size_bias: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, **kwargs): super(DCG_policy, self).__init__() self.device = device self.action_space = action_space self.n_agents = n_agents self.use_parameter_sharing = kwargs['use_parameter_sharing'] self.model_keys = kwargs['model_keys'] self.representation_info_shape = {key: representation[key].output_shapes for key in self.model_keys} self.lstm = True if kwargs['rnn'] == "LSTM" else False self.use_rnn = True if kwargs['use_rnn'] else False self.representation = representation self.target_representation = deepcopy(self.representation) self.utility = utility self.target_utility = deepcopy(self.utility) self.payoffs = payoffs self.target_payoffs = deepcopy(self.payoffs) self.graph = dcgraph self.dcg_s = False if hidden_size_bias is not None: self.dcg_s = True state_dim = kwargs['state_dim'] self.bias = BasicQhead(state_dim, 1, 0, hidden_size_bias, normalize, initialize, activation, device) self.target_bias = deepcopy(self.bias) @property def parameters_model(self): parameters_model = list(self.representation.parameters()) + \ list(self.utility.parameters()) + \ list(self.payoffs.parameters()) if self.dcg_s: parameters_model += list(self.bias.parameters()) return parameters_model
[docs] def get_hidden_states(self, batch_size: int, observation: Dict[str, Tensor], rnn_hidden: Optional[Dict[str, List[Tensor]]] = None, use_target_net=False): """ Get the hidden states of the representations for all agents. Args: batch_size (int): The batch size. observation (Dict[Tensor]): The input observations for the policies. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. use_target_net (bool): Whether to use a target network or not. Returns: rnn_hidden: The RNN hidden states for next step calculating. hidden_states_n: The hidden states of the representations that what we want. """ rnn_hidden_new, hidden_states = {}, {} seq_len = observation[self.model_keys[0]].shape[1] if self.use_rnn else 1 for key in self.model_keys: if self.use_rnn: if use_target_net: outputs = self.target_representation[key](observation[key], *rnn_hidden[key]) else: outputs = self.representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: if use_target_net: outputs = self.target_representation[key](observation[key]) else: outputs = self.representation[key](observation[key]) rnn_hidden_new[key] = [None, None] hidden_states[key] = outputs['state'] if self.use_parameter_sharing: hidden_states_n = hidden_states[self.model_keys[0]].reshape(batch_size, self.n_agents, seq_len, -1) if self.use_rnn: hidden_states_n = hidden_states_n.transpose(1, 2).reshape(batch_size, seq_len, self.n_agents, -1) else: hidden_states_n = hidden_states_n.transpose(1, 2).reshape(batch_size, self.n_agents, -1) else: if self.use_rnn: hidden_states_n = torch.stack(itemgetter(*self.model_keys)(hidden_states), dim=-2) hidden_states_n = hidden_states_n.reshape(batch_size, seq_len, self.n_agents, -1) else: hidden_states_n = torch.stack(itemgetter(*self.model_keys)(hidden_states), dim=-2) hidden_states_n = hidden_states_n.reshape(batch_size, self.n_agents, -1) return rnn_hidden, hidden_states_n
[docs] def copy_target(self): for ep, tp in zip(self.representation.parameters(), self.target_representation.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.utility.parameters(), self.target_utility.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.payoffs.parameters(), self.target_payoffs.parameters()): tp.data.copy_(ep) if self.dcg_s: for ep, tp in zip(self.bias.parameters(), self.target_bias.parameters()): tp.data.copy_(ep)
[docs] class MFQnetwork(Module): """ The base class to implement Mean Field Reinforcement Learning - MFQ. Args: action_space (Optional[Dict[str, Discrete]]): The action space, which type is gym.spaces.Discrete. n_agents (int): The number of agents. representation (ModuleDict): A dict of the representation module for all agents. hidden_size (Sequence[int]): List of hidden units for fully connect layers. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters' initializer. activation (Optional[ModuleType]): The activation function for each layer. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Discrete, n_agents: int, representation: Module, hidden_size: Sequence[int] = None, normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): super(MFQnetwork, self).__init__() self.device = device self.action_space = action_space self.n_agents = n_agents self.n_actions_list = [a_space.n for a_space in self.action_space.values()] self.n_actions_max = max(self.n_actions_list) self.use_parameter_sharing = kwargs['use_parameter_sharing'] self.model_keys = kwargs['model_keys'] self.representation_info_shape = {key: representation[key].output_shapes for key in self.model_keys} self.lstm = True if kwargs["rnn"] == "LSTM" else False self.use_rnn = True if kwargs["use_rnn"] else False # The choice of policy: Boltzmann policy or greedy policy. (Default is 'greedy') self.policy_type = kwargs['policy_type'] self.representation = representation self.target_representation = deepcopy(self.representation) self.dim_input_action_embedding, self.dim_input_Q, self.n_actions = {}, {}, {} self.action_mean_embedding = ModuleDict() self.eval_Qhead, self.target_Qhead, self.target_action_mean_embedding = ModuleDict(), ModuleDict(), ModuleDict() for key in self.model_keys: self.dim_input_action_embedding[key] = self.n_actions_max self.dim_input_Q[key] = self.representation_info_shape[key]['state'][0] + \ kwargs['action_embedding_hidden_size'][-1] self.n_actions[key] = self.action_space[key].n if self.use_parameter_sharing: self.dim_input_action_embedding[key] += self.n_agents self.dim_input_Q[key] += self.n_agents self.action_mean_embedding[key] = Basic_MLP((self.dim_input_action_embedding[key],), kwargs['action_embedding_hidden_size'], normalize, initialize, activation, device) self.eval_Qhead[key] = BasicQhead(self.dim_input_Q[key], self.n_actions[key], hidden_size, normalize, initialize, activation, device) self.target_action_mean_embedding[key] = deepcopy(self.action_mean_embedding[key]) self.target_Qhead[key] = deepcopy(self.eval_Qhead[key]) self.softmax = torch.nn.Softmax(dim=-1) self.temperature = kwargs['temperature'] # Prepare DDP module. self.distributed_training = use_distributed_training if self.distributed_training: self.rank = int(os.environ["RANK"]) for key in self.model_keys: if self.representation[key]._get_name() != "Basic_Identical": self.representation[key] = DistributedDataParallel(module=self.representation[key], device_ids=[self.rank]) self.action_mean_embedding[key] = DistributedDataParallel(module=self.action_mean_embedding[key], device_ids=[self.rank]) self.eval_Qhead[key] = DistributedDataParallel(module=self.eval_Qhead[key], device_ids=[self.rank]) @property def parameters_model(self): parameters_model = {} for key in self.model_keys: parameters_model[key] = list(self.representation[key].parameters()) + list( self.action_mean_embedding[key].parameters()) + list(self.eval_Qhead[key].parameters()) return parameters_model
[docs] def forward(self, observation: Dict[str, Tensor], agent_ids: Tensor = None, actions_mean: Dict[str, Tensor] = None, avail_actions: Dict[str, Tensor] = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns actions of the policy. Parameters: observation (Dict[Tensor]): The input observations for the policies. agent_ids (Tensor): The agents' ids (for parameter sharing). actions_mean (Dict[str, Tensor]): The mean actions of each agent's neighbors. avail_actions (Dict[str, Tensor]): Actions mask values, default is None. agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. argmax_action (Dict[str, Tensor]): The actions output by the policies. evalQ (Dict[str, Tensor]): The evaluations of observation-action pairs. """ rnn_hidden_new, actions, evalQ = {}, {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] actions_mean = {key: Tensor(actions_mean[key]).to(self.device) for key in agent_list} if avail_actions is not None: avail_actions = {key: Tensor(avail_actions[key]).to(self.device) for key in agent_list} for key in agent_list: if self.use_rnn: outputs = self.representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: outputs = self.representation[key](observation[key]) rnn_hidden_new[key] = [None, None] # mean actions embedding if self.use_parameter_sharing: action_embedding_input = torch.cat([actions_mean[key], agent_ids], dim=-1) act_embedding = self.action_mean_embedding[key](action_embedding_input) q_inputs = torch.cat([outputs['state'], act_embedding['state'], agent_ids], dim=-1) else: act_embedding = self.action_mean_embedding[key](actions_mean[key]) q_inputs = torch.cat([outputs['state'], act_embedding['state']], dim=-1) evalQ[key] = self.eval_Qhead[key](q_inputs) evalQ_detach = evalQ[key].clone().detach() if avail_actions is not None: evalQ_detach[avail_actions[key] == 0] = -1e10 if self.policy_type == "Boltzmann": actions_prob = self.get_boltzmann_policy(evalQ_detach) actions[key] = Categorical(probs=actions_prob).sample() elif self.policy_type == "greedy": actions[key] = evalQ_detach.argmax(dim=-1, keepdim=False) else: raise NotImplementedError return rnn_hidden_new, actions, evalQ
[docs] def get_boltzmann_policy(self, q): actions_prob = self.softmax(q / self.temperature) return actions_prob
[docs] def get_mean_actions(self, actions: Dict[str, Tensor], agent_mask_tensor: Tensor, batch_size: int): if self.use_parameter_sharing: actions_tensor = actions[self.model_keys[0]].reshape([-1, self.n_agents]) else: actions_tensor = torch.stack(itemgetter(*self.model_keys)(actions), dim=-1).reshape([-1, self.n_agents]) actions_onehot = one_hot(actions_tensor, num_classes=self.n_actions_max) # count alive neighbors _eyes = torch.eye(self.n_agents).unsqueeze(0).repeat(batch_size, 1, 1).to(self.device) agent_mask_diagonal = agent_mask_tensor.unsqueeze(-1).repeat(1, 1, self.n_agents) * _eyes agent_mask_neighbors = agent_mask_tensor.unsqueeze(-1).repeat(1, 1, self.n_agents) - agent_mask_diagonal agent_alive_neighbors = agent_mask_neighbors.sum(dim=-1, keepdim=True) # calculate mean actions of each agent's neighbors agent_mask_repeat = agent_mask_tensor.unsqueeze(-1).repeat(1, 1, self.n_actions_max) actions_onehot = actions_onehot * agent_mask_repeat actions_sum = actions_onehot.sum(dim=-2, keepdim=True).repeat(1, self.n_agents, 1) actions_neighbors_sum = actions_sum - actions_onehot # Sum of other agents' actions. actions_mean_masked = actions_neighbors_sum * agent_mask_repeat / agent_alive_neighbors return actions_mean_masked
[docs] def Qtarget(self, observation: Dict[str, Tensor], actions_mean: Dict[str, Tensor], agent_ids: Dict[str, Tensor], agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the Q^target of next observations and actions pairs. Parameters: observation (Dict[Tensor]): The observations. actions_mean (Dict[str, Tensor]): The mean of each agent's neighbors. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_target: The evaluations of Q^target. """ rnn_hidden_new, q_target = {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] actions_mean = {key: Tensor(actions_mean[key]).to(self.device) for key in agent_list} for key in agent_list: if self.use_rnn: outputs = self.target_representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new[key] = (outputs['rnn_hidden'], outputs['rnn_cell']) else: outputs = self.target_representation[key](observation[key]) rnn_hidden_new[key] = None # mean actions embedding if self.use_parameter_sharing: input_embedding = torch.cat([actions_mean[key], agent_ids], dim=-1) act_embedding = self.target_action_mean_embedding[key](input_embedding) q_inputs = torch.cat([outputs['state'], act_embedding['state'], agent_ids], dim=-1) else: act_embedding = self.target_action_mean_embedding[key](actions_mean[key]) q_inputs = torch.cat([outputs['state'], act_embedding['state']], dim=-1) q_target[key] = self.target_Qhead[key](q_inputs) return rnn_hidden_new, q_target
[docs] def copy_target(self): for ep, tp in zip(self.representation.parameters(), self.target_representation.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.action_mean_embedding.parameters(), self.target_action_mean_embedding.parameters()): tp.data.copy_(ep) for ep, tp in zip(self.eval_Qhead.parameters(), self.target_Qhead.parameters()): tp.data.copy_(ep)
[docs] class Independent_DDPG_Policy(Module): """ The policy of deep deterministic policy gradient. Args: action_space (Optional[Dict[str, Box]]): The action space. n_agents (int): The number of agents. actor_representation (Optional[ModuleDict]): The representation module for actor network. critic_representation (Optional[ModuleDict]): The representation module for critic network. actor_hidden_size (Sequence[int]): List of hidden units for actor network. critic_hidden_size (Sequence[int]): List of hidden units for critic network. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. activation_action (Optional[ModuleType]): The activation of final layer to bound the actions. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Optional[Dict[str, Box]], n_agents: int, actor_representation: Optional[ModuleDict], critic_representation: Optional[ModuleDict], actor_hidden_size: Sequence[int], critic_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, activation_action: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): super(Independent_DDPG_Policy, self).__init__() self.device = device self.action_space = action_space self.n_agents = n_agents self.use_parameter_sharing = kwargs['use_parameter_sharing'] self.model_keys = kwargs['model_keys'] self.actor_representation_info_shape = {key: actor_representation[key].output_shapes for key in self.model_keys} self.critic_representation_info_shape = {key: critic_representation[key].output_shapes for key in self.model_keys} self.lstm = True if kwargs["rnn"] == "LSTM" else False self.use_rnn = True if kwargs["use_rnn"] else False self.actor_representation = actor_representation self.critic_representation = critic_representation self.target_actor_representation = deepcopy(self.actor_representation) self.target_critic_representation = deepcopy(self.critic_representation) self.actor, self.target_actor = ModuleDict(), ModuleDict() self.critic, self.target_critic = ModuleDict(), ModuleDict() for key in self.model_keys: dim_action = self.action_space[key].shape[-1] dim_actor_in, dim_actor_out, dim_critic_in = self._get_actor_critic_input( self.actor_representation[key].output_shapes['state'][0], dim_action, self.critic_representation[key].output_shapes['state'][0], n_agents) self.actor[key] = ActorNet(dim_actor_in, dim_actor_out, actor_hidden_size, normalize, initialize, activation, activation_action, device) self.critic[key] = CriticNet(dim_critic_in, critic_hidden_size, normalize, initialize, activation, device) self.target_actor[key] = deepcopy(self.actor[key]) self.target_critic[key] = deepcopy(self.critic[key]) # Prepare DDP module. self.distributed_training = use_distributed_training if self.distributed_training: self.rank = int(os.environ["RANK"]) for key in self.model_keys: if self.actor_representation[key]._get_name() != "Basic_Identical": self.actor_representation[key] = DistributedDataParallel(module=self.actor_representation[key], device_ids=[self.rank]) if self.critic_representation[key]._get_name() != "Basic_Identical": self.critic_representation[key] = DistributedDataParallel(module=self.critic_representation[key], device_ids=[self.rank]) self.actor[key] = DistributedDataParallel(module=self.actor[key], device_ids=[self.rank]) self.critic[key] = DistributedDataParallel(module=self.critic[key], device_ids=[self.rank]) @property def parameters_actor(self): parameters_actor = {} for key in self.model_keys: parameters_actor[key] = list(self.actor_representation[key].parameters()) + list( self.actor[key].parameters()) return parameters_actor @property def parameters_critic(self): parameters_critic = {} for key in self.model_keys: parameters_critic[key] = list(self.critic_representation[key].parameters()) + list( self.critic[key].parameters()) return parameters_critic def _get_actor_critic_input(self, dim_actor_rep, dim_action, dim_critic_rep, n_agents): """ Returns the input dimensions of actor and critic networks. Parameters: dim_actor_rep: The dimension of the output of actor presentation. dim_action: The dimension of actions. dim_critic_rep: The dimension of the output of critic presentation. n_agents: The number of agents. Returns: dim_actor_in: The dimension of input of the actor networks. dim_critic_in: The dimension of the input of critic networks. """ dim_actor_in, dim_actor_out = dim_actor_rep, dim_action dim_critic_in = dim_critic_rep + dim_action if self.use_parameter_sharing: dim_actor_in += n_agents dim_critic_in += n_agents return dim_actor_in, dim_actor_out, dim_critic_in
[docs] def forward(self, observation: Dict[str, Tensor], agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns actions of the policy. Parameters: observation (Dict[Tensor]): The input observations for the policies. agent_ids (Tensor): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. actions (Dict[Tensor]): The actions output by the policies. """ rnn_hidden_new, actions = deepcopy(rnn_hidden), {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs = self.actor_representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new.update({key: (outputs['rnn_hidden'], outputs['rnn_cell'])}) else: outputs = self.actor_representation[key](observation[key]) if self.use_parameter_sharing: actor_in = torch.concat([outputs['state'], agent_ids], dim=-1) else: actor_in = outputs['state'] actions[key] = self.actor[key](actor_in) return rnn_hidden_new, actions
[docs] def Qpolicy(self, observation: Dict[str, Tensor], actions: Dict[str, Tensor], agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns Q^policy of current observations and actions pairs. Parameters: observation (Dict[Tensor]): The observations. actions (Dict[Tensor]): The actions. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_eval: The evaluations of Q^policy. """ rnn_hidden_new, q_eval = deepcopy(rnn_hidden), {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs = self.critic_representation[key](observation[key], *rnn_hidden[key]) rnn_hidden_new.update({key: (outputs['rnn_hidden'], outputs['rnn_cell'])}) else: outputs = self.critic_representation[key](observation[key]) if self.use_parameter_sharing: critic_in = torch.concat([outputs['state'], agent_ids], dim=-1) else: critic_in = outputs['state'] q_eval[key] = self.critic[key](torch.concat([critic_in, actions[key]], dim=-1)) return rnn_hidden_new, q_eval
[docs] def Qtarget(self, next_observation: Dict[str, Tensor], next_actions: Dict[str, Tensor], agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the Q^target of next observations and actions pairs. Parameters: next_observation (Dict[Tensor]): The observations of next step. next_actions (Dict[Tensor]): The actions of next step. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_target: The evaluations of Q^target. """ rnn_hidden_new, q_target = deepcopy(rnn_hidden), {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs = self.target_critic_representation[key](next_observation[key], *rnn_hidden[key]) rnn_hidden_new.update({key: (outputs['rnn_hidden'], outputs['rnn_cell'])}) else: outputs = self.target_critic_representation[key](next_observation[key]) if self.use_parameter_sharing: critic_in = torch.concat([outputs['state'], agent_ids], dim=-1) else: critic_in = outputs['state'] q_target[key] = self.target_critic[key](torch.concat([critic_in, next_actions[key]], dim=-1)) return rnn_hidden_new, q_target
[docs] def Atarget(self, next_observation: Dict[str, Tensor], agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the next actions by target policies. Parameters: next_observation (Dict[Tensor]): The observations of next step. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. next_actions (Dict[Tensor]): The next actions. """ rnn_hidden_new, next_actions = deepcopy(rnn_hidden), {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs = self.target_actor_representation[key](next_observation[key], *rnn_hidden[key]) rnn_hidden_new.update({key: (outputs['rnn_hidden'], outputs['rnn_cell'])}) else: outputs = self.target_actor_representation[key](next_observation[key]) if self.use_parameter_sharing: actor_in = torch.concat([outputs['state'], agent_ids], dim=-1) else: actor_in = outputs['state'] next_actions[key] = self.target_actor[key](actor_in) return rnn_hidden_new, next_actions
[docs] def soft_update(self, tau=0.005): for ep, tp in zip(self.actor_representation.parameters(), self.target_actor_representation.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_representation.parameters(), self.target_critic_representation.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.actor.parameters(), self.target_actor.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic.parameters(), self.target_critic.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data)
[docs] class MADDPG_Policy(Independent_DDPG_Policy): """ The policy of deep deterministic policy gradient. Args: action_space (Optional[Dict[str, Box]]): The action space. n_agents (int): The number of agents. actor_representation (Module): The representation module for actor network. critic_representation (Module): The representation module for critic network. actor_hidden_size (Sequence[int]): List of hidden units for actor network. critic_hidden_size (Sequence[int]): List of hidden units for critic network. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. activation_action (Optional[ModuleType]): The activation of final layer to bound the actions. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Optional[Dict[str, Box]], n_agents: int, actor_representation: Optional[ModuleDict], critic_representation: Optional[ModuleDict], actor_hidden_size: Sequence[int], critic_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, activation_action: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): super(MADDPG_Policy, self).__init__(action_space, n_agents, actor_representation, critic_representation, actor_hidden_size, critic_hidden_size, normalize, initialize, activation, activation_action, device, use_distributed_training, **kwargs) def _get_actor_critic_input(self, dim_actor_rep, dim_action, dim_critic_rep, n_agents): """ Returns the input dimensions of actor and critic networks. Parameters: dim_action: The dimension of actions. dim_actor_rep: The dimension of the output of actor presentation. dim_critic_rep: The dimension of the output of critic presentation. n_agents: The number of agents. Returns: dim_actor_in: The dimension of input of the actor networks. dim_critic_in: The dimension of the input of critic networks. """ dim_actor_in, dim_actor_out = dim_actor_rep, dim_action dim_critic_in = dim_critic_rep if self.use_parameter_sharing: dim_actor_in += n_agents dim_critic_in += n_agents return dim_actor_in, dim_actor_out, dim_critic_in
[docs] def Qpolicy(self, joint_observation: Tensor, joint_actions: Tensor, agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns Q^policy of current observations and actions pairs. Parameters: joint_observation (Tensor): The joint observations of the team. joint_actions (Tensor): The joint actions of the team. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_eval: The evaluations of Q^policy. """ rnn_hidden_new, q_eval = deepcopy(rnn_hidden), {} agent_list = self.model_keys if agent_key is None else [agent_key] batch_size = joint_observation.shape[0] seq_len = joint_observation.shape[1] if self.use_rnn else 1 critic_rep_in = torch.concat([joint_observation, joint_actions], dim=-1) if self.use_rnn: outputs = {k: self.critic_representation[k](critic_rep_in, *rnn_hidden[k]) for k in agent_list} rnn_hidden_new.update({k: (outputs[k]['rnn_hidden'], outputs[k]['rnn_cell']) for k in agent_list}) else: outputs = {k: self.critic_representation[k](critic_rep_in) for k in agent_list} bs = batch_size * self.n_agents if self.use_parameter_sharing else batch_size for key in agent_list: if self.use_parameter_sharing: if self.use_rnn: joint_rep_out = outputs[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1, -1) joint_rep_out = joint_rep_out.reshape(bs, seq_len, -1) else: joint_rep_out = outputs[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1) joint_rep_out = joint_rep_out.reshape(bs, -1) critic_in = torch.concat([joint_rep_out, agent_ids], dim=-1) else: if self.use_rnn: joint_rep_out = outputs[key]['state'].reshape(bs, seq_len, -1) else: joint_rep_out = outputs[key]['state'].reshape(bs, -1) critic_in = joint_rep_out q_eval[key] = self.critic[key](critic_in) return rnn_hidden_new, q_eval
[docs] def Qtarget(self, joint_observation: Tensor, joint_actions: Tensor, agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the Q^target of next observations and actions pairs. Parameters: joint_observation (Tensor): The joint observations of the team. joint_actions (Tensor): The joint actions of the team. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_target: The evaluations of Q^target. """ rnn_hidden_new, q_target = deepcopy(rnn_hidden), {} agent_list = self.model_keys if agent_key is None else [agent_key] batch_size = joint_observation.shape[0] seq_len = joint_observation.shape[1] if self.use_rnn else 1 critic_rep_in = torch.concat([joint_observation, joint_actions], dim=-1) if self.use_rnn: outputs = {k: self.target_critic_representation[k](critic_rep_in, *rnn_hidden[k]) for k in agent_list} rnn_hidden_new.update({k: (outputs[k]['rnn_hidden'], outputs[k]['rnn_cell']) for k in agent_list}) else: outputs = {k: self.target_critic_representation[k](critic_rep_in) for k in agent_list} bs = batch_size * self.n_agents if self.use_parameter_sharing else batch_size for key in agent_list: if self.use_parameter_sharing: if self.use_rnn: joint_rep_out = outputs[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1, -1) joint_rep_out = joint_rep_out.reshape(bs, seq_len, -1) else: joint_rep_out = outputs[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1) joint_rep_out = joint_rep_out.reshape(bs, -1) critic_in = torch.concat([joint_rep_out, agent_ids], dim=-1) else: if self.use_rnn: joint_rep_out = outputs[key]['state'].reshape(bs, seq_len, -1) else: joint_rep_out = outputs[key]['state'].reshape(bs, -1) critic_in = joint_rep_out q_target[key] = self.target_critic[key](critic_in) return rnn_hidden_new, q_target
[docs] class Independent_TD3_Policy(Independent_DDPG_Policy, Module): """ The policy of deep deterministic policy gradient. Args: action_space (Optional[Dict[str, Box]]): The action space. n_agents (int): The number of agents. actor_representation (Module): The representation module for actor network. critic_representation (Module): The representation module for critic network. actor_hidden_size (Sequence[int]): List of hidden units for actor network. critic_hidden_size (Sequence[int]): List of hidden units for critic network. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. activation_action (Optional[ModuleType]): The activation of final layer to bound the actions. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Optional[Dict[str, Box]], n_agents: int, actor_representation: Optional[ModuleDict], critic_representation: Optional[ModuleDict], actor_hidden_size: Sequence[int], critic_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, activation_action: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): Module.__init__(self) self.device = device self.action_space = action_space self.n_agents = n_agents self.use_parameter_sharing = kwargs['use_parameter_sharing'] self.model_keys = kwargs['model_keys'] self.actor_representation_info_shape = {key: actor_representation[key].output_shapes for key in self.model_keys} self.critic_representation_info_shape = {key: critic_representation[key].output_shapes for key in self.model_keys} self.lstm = True if kwargs["rnn"] == "LSTM" else False self.use_rnn = True if kwargs["use_rnn"] else False self.actor_representation = actor_representation self.critic_A_representation = critic_representation self.critic_B_representation = deepcopy(critic_representation) self.target_actor_representation = deepcopy(self.actor_representation) self.target_critic_A_representation = deepcopy(self.critic_A_representation) self.target_critic_B_representation = deepcopy(self.critic_B_representation) self.actor, self.target_actor = ModuleDict(), ModuleDict() self.critic_A, self.critic_B = ModuleDict(), ModuleDict() self.target_critic_A, self.target_critic_B = ModuleDict(), ModuleDict() for key in self.model_keys: dim_action = self.action_space[key].shape[-1] dim_actor_in, dim_actor_out, dim_critic_in = self._get_actor_critic_input( self.actor_representation[key].output_shapes['state'][0], dim_action, self.critic_A_representation[key].output_shapes['state'][0], n_agents) self.actor[key] = ActorNet(dim_actor_in, dim_actor_out, actor_hidden_size, normalize, initialize, activation, activation_action, device) self.critic_A[key] = CriticNet(dim_critic_in, critic_hidden_size, normalize, initialize, activation, device) self.critic_B[key] = CriticNet(dim_critic_in, critic_hidden_size, normalize, initialize, activation, device) self.target_actor[key] = deepcopy(self.actor[key]) self.target_critic_A[key] = deepcopy(self.critic_A[key]) self.target_critic_B[key] = deepcopy(self.critic_B[key]) # Prepare DDP module. self.distributed_training = use_distributed_training if self.distributed_training: self.rank = int(os.environ["RANK"]) for key in self.model_keys: if self.actor_representation[key]._get_name() != "Basic_Identical": self.actor_representation[key] = DistributedDataParallel(self.actor_representation[key], device_ids=[self.rank]) if self.critic_A_representation[key]._get_name() != "Basic_Identical": self.critic_A_representation[key] = DistributedDataParallel(self.critic_A_representation[key], device_ids=[self.rank]) if self.critic_B_representation[key]._get_name() != "Basic_Identical": self.critic_B_representation[key] = DistributedDataParallel(self.critic_B_representation[key], device_ids=[self.rank]) self.actor[key] = DistributedDataParallel(module=self.actor[key], device_ids=[self.rank]) self.critic_A[key] = DistributedDataParallel(module=self.critic_A[key], device_ids=[self.rank]) self.critic_B[key] = DistributedDataParallel(module=self.critic_B[key], device_ids=[self.rank]) @property def parameters_critic(self): parameters_critic = {} for key in self.model_keys: parameters_critic[key] = list(self.critic_A_representation[key].parameters()) + list( self.critic_A[key].parameters()) + list(self.critic_B_representation[key].parameters()) + list( self.critic_B[key].parameters()) return parameters_critic
[docs] def Qpolicy(self, observation: Dict[str, Tensor], actions: Dict[str, Tensor], agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns Q^policy of current observations and actions pairs. Parameters: observation (Dict[Tensor]): The observations. actions (Dict[Tensor]): The actions. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_eval: The evaluations of Q^policy. """ q_eval, q_eval_A, q_eval_B = {}, {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs_A = self.critic_A_representation[key](observation[key], *rnn_hidden[key]) outputs_B = self.critic_B_representation[key](observation[key], *rnn_hidden[key]) else: outputs_A = self.critic_A_representation[key](observation[key]) outputs_B = self.critic_B_representation[key](observation[key]) if self.use_parameter_sharing: critic_in_A = torch.concat([outputs_A['state'], agent_ids], dim=-1) critic_in_B = torch.concat([outputs_B['state'], agent_ids], dim=-1) else: critic_in_A = outputs_A['state'] critic_in_B = outputs_B['state'] q_eval_A[key] = self.critic_A[key](torch.concat([critic_in_A, actions[key]], dim=-1)) q_eval_B[key] = self.critic_B[key](torch.concat([critic_in_B, actions[key]], dim=-1)) q_eval[key] = (q_eval_A[key] + q_eval_B[key]) / 2.0 return q_eval_A, q_eval_B, q_eval
[docs] def Qtarget(self, next_observation: Dict[str, Tensor], next_actions: Dict[str, Tensor], agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the Q^target of next observations and actions pairs. Parameters: next_observation (Dict[Tensor]): The observations of next step. next_actions (Dict[Tensor]): The actions of next step. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: rnn_hidden_new (Optional[Dict[str, List[Tensor]]]): The new hidden variables of the RNN. q_target: The evaluations of Q^target. """ q_target = {} agent_list = self.model_keys if agent_key is None else [agent_key] for key in agent_list: if self.use_rnn: outputs_A = self.target_critic_A_representation[key](next_observation[key], *rnn_hidden[key]) outputs_B = self.target_critic_B_representation[key](next_observation[key], *rnn_hidden[key]) else: outputs_A = self.target_critic_A_representation[key](next_observation[key]) outputs_B = self.target_critic_B_representation[key](next_observation[key]) if self.use_parameter_sharing: critic_in_A = torch.concat([outputs_A['state'], agent_ids], dim=-1) critic_in_B = torch.concat([outputs_B['state'], agent_ids], dim=-1) else: critic_in_A = outputs_A['state'] critic_in_B = outputs_B['state'] q_target_A = self.target_critic_A[key](torch.concat([critic_in_A, next_actions[key]], dim=-1)) q_target_B = self.target_critic_B[key](torch.concat([critic_in_B, next_actions[key]], dim=-1)) q_target[key] = torch.minimum(q_target_A, q_target_B) return q_target
[docs] def soft_update(self, tau=0.005): for ep, tp in zip(self.actor_representation.parameters(), self.target_actor_representation.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_A_representation.parameters(), self.target_critic_A_representation.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_B_representation.parameters(), self.target_critic_B_representation.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.actor.parameters(), self.target_actor.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_A.parameters(), self.target_critic_A.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_B.parameters(), self.target_critic_B.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data)
[docs] class MATD3_Policy(MADDPG_Policy, Module): """ The policy of deep deterministic policy gradient. Args: action_space (Optional[Dict[str, Box]]): The action space. n_agents (int): The number of agents. actor_representation (Module): The representation module for actor network. critic_representation (Module): The representation module for critic network. actor_hidden_size (Sequence[int]): List of hidden units for actor network. critic_hidden_size (Sequence[int]): List of hidden units for critic network. normalize (Optional[ModuleType]): The layer normalization over a minibatch of inputs. initialize (Optional[Callable[..., Tensor]]): The parameters initializer. activation (Optional[ModuleType]): The activation function for each layer. activation_action (Optional[ModuleType]): The activation of final layer to bound the actions. device (Optional[Union[str, int, torch.device]]): The calculating device. use_distributed_training (bool): Whether to use multi-GPU for distributed training. **kwargs: Other arguments. """ def __init__(self, action_space: Optional[Dict[str, Box]], n_agents: int, actor_representation: Optional[ModuleDict], critic_representation: Optional[ModuleDict], actor_hidden_size: Sequence[int], critic_hidden_size: Sequence[int], normalize: Optional[ModuleType] = None, initialize: Optional[Callable[..., Tensor]] = None, activation: Optional[ModuleType] = None, activation_action: Optional[ModuleType] = None, device: Optional[Union[str, int, torch.device]] = None, use_distributed_training: bool = False, **kwargs): Module.__init__(self) self.device = device self.action_space = action_space self.n_agents = n_agents self.use_parameter_sharing = kwargs['use_parameter_sharing'] self.model_keys = kwargs['model_keys'] self.actor_representation_info_shape = {key: actor_representation[key].output_shapes for key in self.model_keys} self.critic_representation_info_shape = {key: critic_representation[key].output_shapes for key in self.model_keys} self.lstm = True if kwargs["rnn"] == "LSTM" else False self.use_rnn = True if kwargs["use_rnn"] else False self.actor_representation = actor_representation self.critic_A_representation = critic_representation self.critic_B_representation = deepcopy(critic_representation) self.target_actor_representation = deepcopy(self.actor_representation) self.target_critic_A_representation = deepcopy(self.critic_A_representation) self.target_critic_B_representation = deepcopy(self.critic_B_representation) self.actor, self.target_actor = ModuleDict(), ModuleDict() self.critic_A, self.critic_B = ModuleDict(), ModuleDict() self.target_critic_A, self.target_critic_B = ModuleDict(), ModuleDict() for key in self.model_keys: dim_action = self.action_space[key].shape[-1] dim_actor_in, dim_actor_out, dim_critic_in = self._get_actor_critic_input( self.actor_representation[key].output_shapes['state'][0], dim_action, self.critic_A_representation[key].output_shapes['state'][0], n_agents) self.actor[key] = ActorNet(dim_actor_in, dim_actor_out, actor_hidden_size, normalize, initialize, activation, activation_action, device) self.critic_A[key] = CriticNet(dim_critic_in, critic_hidden_size, normalize, initialize, activation, device) self.critic_B[key] = CriticNet(dim_critic_in, critic_hidden_size, normalize, initialize, activation, device) self.target_actor[key] = deepcopy(self.actor[key]) self.target_critic_A[key] = deepcopy(self.critic_A[key]) self.target_critic_B[key] = deepcopy(self.critic_B[key]) # Prepare DDP module. self.distributed_training = use_distributed_training if self.distributed_training: self.rank = int(os.environ["RANK"]) for key in self.model_keys: if self.actor_representation[key]._get_name() != "Basic_Identical": self.actor_representation[key] = DistributedDataParallel(self.actor_representation[key], device_ids=[self.rank]) if self.critic_A_representation[key]._get_name() != "Basic_Identical": self.critic_A_representation[key] = DistributedDataParallel(self.critic_A_representation[key], device_ids=[self.rank]) if self.critic_B_representation[key]._get_name() != "Basic_Identical": self.critic_B_representation[key] = DistributedDataParallel(self.critic_B_representation[key], device_ids=[self.rank]) self.actor[key] = DistributedDataParallel(module=self.actor[key], device_ids=[self.rank]) self.critic_A[key] = DistributedDataParallel(module=self.critic_A[key], device_ids=[self.rank]) self.critic_B[key] = DistributedDataParallel(module=self.critic_B[key], device_ids=[self.rank]) @property def parameters_critic(self): parameters_critic = {} for key in self.model_keys: parameters_critic[key] = list(self.critic_A_representation[key].parameters()) + list( self.critic_A[key].parameters()) + list(self.critic_B_representation[key].parameters()) + list( self.critic_B[key].parameters()) return parameters_critic
[docs] def Qpolicy(self, joint_observation: Tensor, joint_actions: Tensor, agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns Q^policy of current observations and actions pairs. Parameters: joint_observation (Tensor): The joint observations of the team. joint_actions (Tensor): The joint actions of the team. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: q_eval_A (Dict[Tensor]): The evaluations of Q^policy calculated by critic A. q_eval_B (Dict[Tensor]): The evaluations of Q^policy calculated by critic B. q_eval (Dict[Tensor]): The evaluations of Q^policy averaged by critic A and Critic B. """ q_eval, q_eval_A, q_eval_B = {}, {}, {} agent_list = self.model_keys if agent_key is None else [agent_key] batch_size = joint_observation.shape[0] seq_len = joint_observation.shape[1] if self.use_rnn else 1 critic_rep_in = torch.concat([joint_observation, joint_actions], dim=-1) if self.use_rnn: outputs_A = {k: self.critic_A_representation[k](critic_rep_in, *rnn_hidden[k]) for k in agent_list} outputs_B = {k: self.critic_B_representation[k](critic_rep_in, *rnn_hidden[k]) for k in agent_list} else: outputs_A = {k: self.critic_A_representation[k](critic_rep_in) for k in agent_list} outputs_B = {k: self.critic_B_representation[k](critic_rep_in) for k in agent_list} bs = batch_size * self.n_agents if self.use_parameter_sharing else batch_size for key in agent_list: if self.use_parameter_sharing: if self.use_rnn: joint_rep_out_A = outputs_A[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1, -1) joint_rep_out_B = outputs_B[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1, -1) joint_rep_out_A = joint_rep_out_A.reshape(bs, seq_len, -1) joint_rep_out_B = joint_rep_out_B.reshape(bs, seq_len, -1) else: joint_rep_out_A = outputs_A[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1) joint_rep_out_B = outputs_B[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1) joint_rep_out_A = joint_rep_out_A.reshape(bs, -1) joint_rep_out_B = joint_rep_out_B.reshape(bs, -1) critic_in_A = torch.concat([joint_rep_out_A, agent_ids], dim=-1) critic_in_B = torch.concat([joint_rep_out_B, agent_ids], dim=-1) else: if self.use_rnn: joint_rep_out_A = outputs_A[key]['state'].reshape(bs, seq_len, -1) joint_rep_out_B = outputs_B[key]['state'].reshape(bs, seq_len, -1) else: joint_rep_out_A = outputs_A[key]['state'].reshape(bs, -1) joint_rep_out_B = outputs_B[key]['state'].reshape(bs, -1) critic_in_A = joint_rep_out_A critic_in_B = joint_rep_out_B q_eval_A[key] = self.critic_A[key](critic_in_A) q_eval_B[key] = self.critic_B[key](critic_in_B) q_eval[key] = (q_eval_A[key] + q_eval_B[key]) / 2.0 return q_eval_A, q_eval_B, q_eval
[docs] def Qtarget(self, joint_observation: Tensor, joint_actions: Tensor, agent_ids: Tensor = None, agent_key: str = None, rnn_hidden: Optional[Dict[str, List[Tensor]]] = None): """ Returns the Q^target of next observations and actions pairs. Parameters: joint_observation (Tensor): The joint observations of the team. joint_actions (Tensor): The joint actions of the team. agent_ids (Dict[Tensor]): The agents' ids (for parameter sharing). agent_key (str): Calculate actions for specified agent. rnn_hidden (Optional[Dict[str, List[Tensor]]]): The hidden variables of the RNN. Returns: q_target (Dict[Tensor]): The evaluations of Q^target. """ q_target = {} agent_list = self.model_keys if agent_key is None else [agent_key] batch_size = joint_observation.shape[0] seq_len = joint_observation.shape[1] if self.use_rnn else 1 critic_rep_in = torch.concat([joint_observation, joint_actions], dim=-1) if self.use_rnn: outputs_A = {k: self.target_critic_A_representation[k](critic_rep_in, *rnn_hidden[k]) for k in agent_list} outputs_B = {k: self.target_critic_B_representation[k](critic_rep_in, *rnn_hidden[k]) for k in agent_list} else: outputs_A = {k: self.target_critic_A_representation[k](critic_rep_in) for k in agent_list} outputs_B = {k: self.target_critic_B_representation[k](critic_rep_in) for k in agent_list} bs = batch_size * self.n_agents if self.use_parameter_sharing else batch_size for key in agent_list: if self.use_parameter_sharing: if self.use_rnn: joint_rep_out_A = outputs_A[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1, -1) joint_rep_out_B = outputs_B[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1, -1) joint_rep_out_A = joint_rep_out_A.reshape(bs, seq_len, -1) joint_rep_out_B = joint_rep_out_B.reshape(bs, seq_len, -1) else: joint_rep_out_A = outputs_A[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1) joint_rep_out_B = outputs_B[key]['state'].unsqueeze(1).expand(-1, self.n_agents, -1) joint_rep_out_A = joint_rep_out_A.reshape(bs, -1) joint_rep_out_B = joint_rep_out_B.reshape(bs, -1) critic_in_A = torch.concat([joint_rep_out_A, agent_ids], dim=-1) critic_in_B = torch.concat([joint_rep_out_B, agent_ids], dim=-1) else: if self.use_rnn: joint_rep_out_A = outputs_A[key]['state'].reshape(bs, seq_len, -1) joint_rep_out_B = outputs_B[key]['state'].reshape(bs, seq_len, -1) else: joint_rep_out_A = outputs_A[key]['state'].reshape(bs, -1) joint_rep_out_B = outputs_B[key]['state'].reshape(bs, -1) critic_in_A = joint_rep_out_A critic_in_B = joint_rep_out_B q_target_A = self.target_critic_A[key](critic_in_A) q_target_B = self.target_critic_B[key](critic_in_B) q_target[key] = torch.minimum(q_target_A, q_target_B) return q_target
[docs] def soft_update(self, tau=0.005): for ep, tp in zip(self.actor_representation.parameters(), self.target_actor_representation.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_A_representation.parameters(), self.target_critic_A_representation.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_B_representation.parameters(), self.target_critic_B_representation.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.actor.parameters(), self.target_actor.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_A.parameters(), self.target_critic_A.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data) for ep, tp in zip(self.critic_B.parameters(), self.target_critic_B.parameters()): tp.data.mul_(1 - tau) tp.data.add_(tau * ep.data)