Source code for xuance.tensorflow.agents.multi_agent_rl.dcg_agents

import numpy as np
from argparse import Namespace
from gymnasium.spaces import Space
from xuance.common import List, Optional, MultiAgentBaseCallback
from xuance.environment import DummyVecMultiAgentEnv, SubprocVecMultiAgentEnv
from xuance.tensorflow import tk
from xuance.tensorflow.agents import MARLAgents


[docs] class DCG_Agents(MARLAgents): def __init__( self, config: Namespace, envs: Optional[DummyVecMultiAgentEnv | SubprocVecMultiAgentEnv] = None, num_agents: Optional[int] = None, agent_keys: Optional[List[str]] = None, state_space: Optional[Space] = None, observation_space: Optional[Space] = None, action_space: Optional[Space] = None, callback: Optional[MultiAgentBaseCallback] = None ): super(DCG_Agents, self).__init__( config, envs, num_agents, agent_keys, state_space, observation_space, action_space, callback ) self.gamma = config.gamma self.start_greedy, self.end_greedy = config.start_greedy, config.end_greedy self.egreedy = self.start_greedy self.delta_egreedy = (self.start_greedy - self.end_greedy) / config.decay_step_greedy input_representation = get_repre_in(config) self.use_rnn = config.use_rnn if self.use_rnn: kwargs_rnn = {"N_recurrent_layers": config.N_recurrent_layers, "dropout": config.dropout, "rnn": config.rnn} representation = REGISTRY_Representation[config.representation](*input_representation, **kwargs_rnn) else: representation = REGISTRY_Representation[config.representation](*input_representation) repre_state_dim = representation.output_shapes['state'][0] from xuance.tensorflow.policies.coordination_graph import DCG_utility, DCG_payoff, Coordination_Graph utility = DCG_utility(repre_state_dim, config.hidden_utility_dim, config.dim_act) payoffs = DCG_payoff(repre_state_dim * 2, config.hidden_payoff_dim, config.dim_act, config) dcgraph = Coordination_Graph(config.n_agents, config.graph_type) dcgraph.set_coordination_graph() if config.env_name == "StarCraft2": action_space = config.action_space else: action_space = config.action_space[config.agent_keys[0]] if config.agent == "DCG_S": policy = REGISTRY_Policy[config.policy](action_space, config.state_space.shape[0], representation, utility, payoffs, dcgraph, config.hidden_bias_dim, None, None, tk.layers.Activation('relu'), device, use_rnn=config.use_rnn, rnn=config.rnn) else: policy = REGISTRY_Policy[config.policy](action_space, config.state_space.shape[0], representation, utility, payoffs, dcgraph, None, None, None, tk.layers.Activation('relu'), device, use_rnn=config.use_rnn, rnn=config.rnn) lr_scheduler = MyLinearLR(config.learning_rate, start_factor=1.0, end_factor=self.end_factor_lr_decay, total_iters=get_total_iters(config.agent_name, config)) optimizer = tk.optimizers.Adam(lr_scheduler) self.observation_space = envs.observation_space self.action_space = envs.action_space self.representation_info_shape = policy.representation.output_shapes self.auxiliary_info_shape = {} if config.state_space is not None: config.dim_state, state_shape = config.state_space.shape, config.state_space.shape else: config.dim_state, state_shape = None, None buffer = MARL_OffPolicyBuffer_RNN if self.use_rnn else MARL_OffPolicyBuffer input_buffer = (config.n_agents, state_shape, config.obs_shape, config.act_shape, config.rew_shape, config.done_shape, envs.num_envs, config.buffer_size, config.batch_size) memory = buffer(*input_buffer, max_episode_steps=envs.max_episode_steps, dim_act=config.dim_act) from xuance.tensorflow.learners.multi_agent_rl.dcg_learner import DCG_Learner learner = DCG_Learner(config, policy, optimizer, config.device, config.model_dir, config.gamma, config.sync_frequency) super(DCG_Agents, self).__init__(config, envs, policy, memory, learner, device, config.log_dir, config.model_dir) self.on_policy = False
[docs] def act(self, obs_n, *rnn_hidden, avail_actions=None, test_mode=False): batch_size = obs_n.shape[0] obs_n = tf.convert_to_tensor(obs_n) obs_in = tf.reshape(obs_n, [batch_size * self.n_agents, 1, -1]) rnn_hidden_next, hidden_states = self.learner.get_hidden_states(obs_in, *rnn_hidden) greedy_actions = self.learner.act(tf.reshape(hidden_states, [batch_size, self.n_agents, -1]), avail_actions=avail_actions) greedy_actions = greedy_actions.numpy() if test_mode: return rnn_hidden_next, greedy_actions else: if avail_actions is None: random_actions = np.random.choice(self.dim_act, [self.nenvs, self.n_agents]) else: random_actions = CategoricalDistribution(tf.convert_to_tensor(avail_actions)).stochastic_sample().numpy() if np.random.rand() < self.egreedy: return rnn_hidden_next, random_actions else: return rnn_hidden_next, greedy_actions
[docs] def train(self, i_step, n_epochs=1): if self.egreedy >= self.end_greedy: self.egreedy = self.start_greedy - self.delta_egreedy * i_step info_train = {} if i_step > self.start_training: for i_epoch in range(n_epochs): sample = self.memory.sample() if self.use_rnn: info_train = self.learner.update_recurrent(sample) else: info_train = self.learner.update(sample) info_train["epsilon-greedy"] = self.egreedy return info_train