Source code for xuance.tensorflow.agents.multi_agent_rl.mfq_agents

import numpy as np
from operator import itemgetter
from argparse import Namespace
from tqdm import tqdm
from copy import deepcopy
from gymnasium.spaces import Space
from xuance.common import (
    List, Optional, MeanField_OffPolicyBuffer, MeanField_OffPolicyBuffer_RNN, MultiAgentBaseCallback
)
from xuance.environment import DummyVecMultiAgentEnv, SubprocVecMultiAgentEnv
from xuance.tensorflow import Module, tf
from xuance.tensorflow.utils import NormalizeFunctions, ActivationFunctions, InitializeFunctions
from xuance.tensorflow.policies import REGISTRY_Policy
from xuance.tensorflow.agents import OffPolicyMARLAgents


[docs] class MFQ_Agents(OffPolicyMARLAgents): def __init__( self, config: Namespace, envs: Optional[DummyVecMultiAgentEnv | SubprocVecMultiAgentEnv] = None, num_agents: Optional[int] = None, agent_keys: Optional[List[str]] = None, state_space: Optional[Space] = None, observation_space: Optional[Space] = None, action_space: Optional[Space] = None, callback: Optional[MultiAgentBaseCallback] = None ): super(MFQ_Agents, self).__init__( config, envs, num_agents, agent_keys, state_space, observation_space, action_space, callback ) self.n_actions_list = [a_space.n for a_space in self.action_space.values()] self.n_actions_max = max(self.n_actions_list) self.actions_mean = [{k: np.zeros(self.n_actions_max) for k in self.agent_keys} for _ in range(self.n_envs)] self.start_greedy, self.end_greedy = config.start_greedy, config.end_greedy self.delta_egreedy = (self.start_greedy - self.end_greedy) / config.decay_step_greedy self.e_greedy = self.start_greedy self.policy = self._build_policy() # build policy self.memory = self._build_memory() # build memory self.learner = self._build_learner(self.config, self.model_keys, self.agent_keys, self.policy, self.callback) def _build_memory(self): """Build replay buffer for models training """ if self.use_actions_mask: avail_actions_shape = {key: (self.action_space[key].n,) for key in self.agent_keys} else: avail_actions_shape = None input_dict = dict(agent_keys=self.agent_keys, state_space=self.state_space if self.use_global_state else None, obs_space=self.observation_space, act_space=self.action_space, n_envs=self.n_envs, buffer_size=self.buffer_size, batch_size=self.batch_size, avail_actions_shape=avail_actions_shape, use_actions_mask=self.use_actions_mask, max_episode_steps=self.episode_length, n_actions_max=self.n_actions_max) Buffer = MeanField_OffPolicyBuffer_RNN if self.use_rnn else MeanField_OffPolicyBuffer return Buffer(**input_dict) def _build_policy(self) -> Module: """ Build representation(s) and policy(ies) for agent(s) Returns: policy (Module): A dict of policies. """ normalize_fn = NormalizeFunctions[self.config.normalize] if hasattr(self.config, "normalize") else None initializer = InitializeFunctions[self.config.initialize] if hasattr(self.config, "initialize") else None activation = ActivationFunctions[self.config.activation] # build representations representation = self._build_representation(self.config.representation, self.observation_space, self.config) # build policies if self.config.policy == "MF_Q_network": policy = REGISTRY_Policy["MF_Q_network"]( action_space=self.action_space, n_agents=self.n_agents, representation=representation, hidden_size=self.config.q_hidden_size, normalize=normalize_fn, initialize=initializer, activation=activation, use_distributed_training=self.distributed_training, use_parameter_sharing=self.use_parameter_sharing, model_keys=self.model_keys, use_rnn=self.use_rnn, rnn=self.config.rnn if self.use_rnn else None, temperature=self.config.temperature, policy_type="Boltzmann", # "Boltzmann" or "greedy" action_embedding_hidden_size=self.config.action_embedding_hidden_size) else: raise AttributeError(f"MFQ currently does not support the policy named {self.config.policy}.") return policy def _build_inputs_mean_mask(self, agent_mask: Optional[dict] = None, act_mean_dict=None): batch_size = len(act_mean_dict) agent_mask_array = np.array([itemgetter(*self.agent_keys)(data) for data in agent_mask]) # get mean actions as input if self.use_parameter_sharing: key = self.agent_keys[0] mean_actions_array = np.array([itemgetter(*self.agent_keys)(data) for data in act_mean_dict], dtype=np.float32) if self.use_rnn: mean_actions_input = {key: mean_actions_array.reshape([batch_size * self.n_agents, 1, -1])} else: mean_actions_input = {key: mean_actions_array.reshape([batch_size * self.n_agents, -1])} else: if self.use_rnn: mean_actions_input = {k: np.stack([data[k] for data in act_mean_dict]).astype(np.float32).reshape( [batch_size, 1, -1]) for k in self.agent_keys} else: mean_actions_input = {k: np.stack([data[k] for data in act_mean_dict]).astype(np.float32).reshape( [batch_size, -1]) for k in self.agent_keys} return mean_actions_input, agent_mask_array
[docs] def store_experience(self, obs_dict, avail_actions, actions_dict, obs_next_dict, avail_actions_next, rewards_dict, terminals_dict, info, **kwargs): """ Store experience data into replay buffer. Parameters: obs_dict (List[dict]): Observations for each agent in self.agent_keys. avail_actions (List[dict]): Actions mask values for each agent in self.agent_keys. actions_dict (List[dict]): Actions for each agent in self.agent_keys. obs_next_dict (List[dict]): Next observations for each agent in self.agent_keys. avail_actions_next (List[dict]): The next actions mask values for each agent in self.agent_keys. rewards_dict (List[dict]): Rewards for each agent in self.agent_keys. terminals_dict (List[dict]): Terminated values for each agent in self.agent_keys. info (List[dict]): Other information for the environment at current step. """ experience_data = { 'obs': {k: np.array([data[k] for data in obs_dict]) for k in self.agent_keys}, 'actions': {k: np.array([data[k] for data in actions_dict]) for k in self.agent_keys}, 'obs_next': {k: np.array([data[k] for data in obs_next_dict]) for k in self.agent_keys}, 'rewards': {k: np.array([data[k] for data in rewards_dict]) for k in self.agent_keys}, 'terminals': {k: np.array([data[k] for data in terminals_dict]) for k in self.agent_keys}, 'agent_mask': {k: np.array([data[k] for data in kwargs['agent_mask']]) for k in self.agent_keys}, } if self.use_rnn: experience_data['episode_steps'] = np.array([data['episode_step'] - 1 for data in info]) if self.use_global_state: experience_data['state'] = np.array(kwargs['state']) experience_data['state_next'] = np.array(kwargs['next_state']) if self.use_actions_mask: experience_data['avail_actions'] = {k: np.array([data[k] for data in avail_actions]) for k in self.agent_keys} experience_data['avail_actions_next'] = {k: np.array([data[k] for data in avail_actions_next]) for k in self.agent_keys} experience_data['actions_mean'] = {k: np.array([data[k] for data in kwargs['actions_mean']]) for k in self.agent_keys} experience_data['actions_mean_next'] = {k: np.array([data[k] for data in kwargs['actions_mean_next']]) for k in self.agent_keys} self.memory.store(**experience_data)
[docs] def get_actions(self, obs_dict: List[dict], agent_mask: Optional[List[dict]] = None, act_mean_dict: Optional[List[dict]] = None, avail_actions_dict: Optional[List[dict]] = None, rnn_hidden: Optional[dict] = None, test_mode: Optional[bool] = False): batch_size = len(obs_dict) mean_actions_input, agent_mask_array = self._build_inputs_mean_mask(agent_mask, act_mean_dict) obs_input, agents_id, avail_actions_input = self._build_inputs(obs_dict, avail_actions_dict) agent_mask_tensor = tf.cast(agent_mask_array, dtype=tf.float32) hidden_state, actions, q_output = self.policy(observation=obs_input, agent_ids=agents_id, actions_mean=mean_actions_input, avail_actions=avail_actions_input, rnn_hidden=rnn_hidden) actions_mean_masked = self.policy.get_mean_actions(actions=actions, agent_mask_tensor=agent_mask_tensor, batch_size=batch_size) actions_mean_masked = actions_mean_masked.numpy() actions_mean_dict = [{k: actions_mean_masked[e, i] for i, k in enumerate(self.agent_keys)} for e in range(batch_size)] if self.use_parameter_sharing: key = self.agent_keys[0] actions_out = actions[key].numpy().reshape([batch_size, self.n_agents]) actions_dict = [{k: actions_out[e, i] for i, k in enumerate(self.agent_keys)} for e in range(batch_size)] else: actions_out = {k: actions[k].numpy().reshape(batch_size) for k in self.agent_keys} actions_dict = [{k: actions_out[k][i] for k in self.agent_keys} for i in range(batch_size)] if not test_mode: # get random actions actions_dict = self.exploration(batch_size, actions_dict, avail_actions_dict) return {"hidden_state": hidden_state, "actions": actions_dict, "actions_mean": actions_mean_dict}
[docs] def train(self, train_steps: int) -> dict: train_info = {} if self.use_rnn: with tqdm(total=train_steps) as process_bar: step_start, step_last = deepcopy(self.current_step), deepcopy(self.current_step) n_steps_all = train_steps * self.n_envs while step_last - step_start < n_steps_all: self.run_episodes(None, n_episodes=self.n_envs, test_mode=False) if self.current_step >= self.start_training: update_info = self.train_epochs(n_epochs=self.n_epochs) self.log_infos(update_info, self.current_step) train_info.update(update_info) self.callback.on_train_epochs_end(self.current_step, policy=self.policy, memory=self.memory, current_episode=self.current_episode, train_steps=train_steps, update_info=update_info) process_bar.update((self.current_step - step_last) // self.n_envs) step_last = deepcopy(self.current_step) process_bar.update(train_steps - process_bar.last_print_n) self.callback.on_train_step_end(self.current_step, envs=self.train_envs, policy=self.policy, train_steps=train_steps, train_info=train_info) return train_info obs_dict = self.train_envs.buf_obs agent_mask_dict = [data['agent_mask'] for data in self.train_envs.buf_info] actions_mean_dict = self.actions_mean avail_actions = self.train_envs.buf_avail_actions if self.use_actions_mask else None state = self.train_envs.buf_state.copy() if self.use_global_state else None for _ in tqdm(range(train_steps)): policy_out = self.get_actions(obs_dict=obs_dict, agent_mask=agent_mask_dict, act_mean_dict=actions_mean_dict, avail_actions_dict=avail_actions, test_mode=False) actions_dict = policy_out['actions'] actions_mean_next_dict = policy_out['actions_mean'] next_obs_dict, rewards_dict, terminated_dict, truncated, info = self.train_envs.step(actions_dict) next_state = self.train_envs.buf_state.copy() if self.use_global_state else None next_avail_actions = self.train_envs.buf_avail_actions if self.use_actions_mask else None self.callback.on_train_step(self.current_step, envs=self.train_envs, policy=self.policy, obs=obs_dict, next_obs=next_obs_dict, policy_out=policy_out, acts=actions_dict, actions_mean_dict=actions_mean_dict, rewards=rewards_dict, state=state, next_state=next_state, avail_actions=avail_actions, next_avail_actions=next_avail_actions, terminals=terminated_dict, truncations=truncated, infos=info, train_steps=train_steps) self.store_experience(obs_dict, avail_actions, actions_dict, next_obs_dict, next_avail_actions, rewards_dict, terminated_dict, info, **{'state': state, 'next_state': next_state, 'agent_mask': agent_mask_dict, 'actions_mean': actions_mean_dict, 'actions_mean_next': actions_mean_next_dict}) if self.current_step >= self.start_training and self.current_step % self.training_frequency == 0: update_info = self.train_epochs(n_epochs=self.n_epochs) self.log_infos(update_info, self.current_step) train_info.update(update_info) self.callback.on_train_epochs_end(self.current_step, policy=self.policy, memory=self.memory, current_episode=self.current_episode, train_steps=train_steps, update_info=update_info) obs_dict = deepcopy(next_obs_dict) agent_mask_dict = [data['agent_mask'] for data in info] actions_mean_dict = deepcopy(actions_mean_next_dict) if self.use_global_state: state = deepcopy(next_state) if self.use_actions_mask: avail_actions = deepcopy(next_avail_actions) for i in range(self.n_envs): if all(terminated_dict[i].values()) or truncated[i]: obs_dict[i] = info[i]["reset_obs"] self.train_envs.buf_obs[i] = info[i]["reset_obs"] if self.use_global_state: state = info[i]["reset_state"] self.train_envs.buf_state[i] = info[i]["reset_state"] if self.use_actions_mask: avail_actions[i] = info[i]["reset_avail_actions"] self.train_envs.buf_avail_actions[i] = info[i]["reset_avail_actions"] self.train_envs.buf_info[i]["agent_mask"] = {k: True for k in self.agent_keys} agent_mask_dict[i] = {k: True for k in self.agent_keys} actions_mean_dict[i] = {k: np.zeros(self.n_actions_max) for k in self.agent_keys} self.current_episode[i] += 1 if self.use_wandb: episode_info = { f"Train-Results/Episode-Steps/env-%d" % i: info[i]["episode_step"], f"Train-Results/Episode-Rewards/env-%d" % i: info[i]["episode_score"] } else: episode_info = { f"Train-Results/Episode-Steps": {"env-%d" % i: info[i]["episode_step"]}, f"Train-Results/Episode-Rewards": { "env-%d" % i: np.mean(itemgetter(*self.agent_keys)(info[i]["episode_score"]))} } self.log_infos(episode_info, self.current_step) train_info.update(episode_info) self.callback.on_train_episode_info(envs=self.train_envs, policy=self.policy, env_id=i, infos=info, use_wandb=self.use_wandb, current_step=self.current_step, current_episode=self.current_episode, train_steps=train_steps) self.current_step += self.n_envs self._update_explore_factor() self.actions_mean = deepcopy(actions_mean_dict) self.callback.on_train_step_end(self.current_step, envs=self.train_envs, policy=self.policy, train_steps=train_steps, train_info=train_info) return train_info
[docs] def run_episodes(self, n_episodes: int = 1, run_envs: Optional[DummyVecMultiAgentEnv | SubprocVecMultiAgentEnv] = None, test_mode: bool = False, close_envs: bool = True) -> list: envs = self.train_envs if run_envs is None else run_envs num_envs = envs.num_envs videos, episode_videos, images = [[] for _ in range(num_envs)], [], None current_episode, current_step, scores, best_score = 0, 0, [], -np.inf obs_dict, info = envs.reset() agent_mask_dict = [data['agent_mask'] for data in info] actions_mean_dict = [{k: np.zeros(self.n_actions_max) for k in self.agent_keys} for _ in range(num_envs)] state = envs.buf_state.copy() if self.use_global_state else None avail_actions = envs.buf_avail_actions if self.use_actions_mask else None if test_mode: if self.config.render_mode == "rgb_array" and self.render: images = envs.render(self.config.render_mode) for idx, img in enumerate(images): videos[idx].append(img) else: if self.use_rnn: self.memory.clear_episodes() rnn_hidden = self.init_rnn_hidden(num_envs) while current_episode < n_episodes: policy_out = self.get_actions(obs_dict=obs_dict, agent_mask=agent_mask_dict, act_mean_dict=actions_mean_dict, avail_actions_dict=avail_actions, rnn_hidden=rnn_hidden, test_mode=test_mode) rnn_hidden, actions_dict = policy_out['hidden_state'], policy_out['actions'] actions_mean_next_dict = policy_out['actions_mean'] next_obs_dict, rewards_dict, terminated_dict, truncated, info = envs.step(actions_dict) next_state = envs.buf_state.copy() if self.use_global_state else None next_avail_actions = envs.buf_avail_actions if self.use_actions_mask else None if test_mode: if self.config.render_mode == "rgb_array" and self.render: images = envs.render(self.config.render_mode) for idx, img in enumerate(images): videos[idx].append(img) else: self.store_experience(obs_dict, avail_actions, actions_dict, next_obs_dict, next_avail_actions, rewards_dict, terminated_dict, info, **{'state': state, 'next_state': next_state, 'agent_mask': agent_mask_dict, 'actions_mean': actions_mean_dict, 'actions_mean_next': actions_mean_next_dict}) self.callback.on_test_step(envs=envs, policy=self.policy, images=images, test_mode=test_mode, obs=obs_dict, policy_out=policy_out, acts=actions_dict, actions_mean_dict=actions_mean_dict, next_obs=next_obs_dict, rewards=rewards_dict, terminals=terminated_dict, truncations=truncated, infos=info, state=state, next_state=next_state, current_train_step=self.current_step, n_episodes=n_episodes, current_step=current_step, current_episode=current_episode) obs_dict = deepcopy(next_obs_dict) agent_mask_dict = [data['agent_mask'] for data in info] actions_mean_dict = deepcopy(actions_mean_next_dict) if self.use_global_state: state = deepcopy(next_state) if self.use_actions_mask: avail_actions = deepcopy(next_avail_actions) for i in range(num_envs): if all(terminated_dict[i].values()) or truncated[i]: current_episode += 1 obs_dict[i] = info[i]["reset_obs"] envs.buf_obs[i] = info[i]["reset_obs"] if self.use_global_state: state = info[i]["reset_state"] self.train_envs.buf_state[i] = info[i]["reset_state"] if self.use_actions_mask: avail_actions[i] = info[i]["reset_avail_actions"] envs.buf_avail_actions[i] = info[i]["reset_avail_actions"] agent_mask_dict[i] = {k: True for k in self.agent_keys} actions_mean_dict[i] = {k: np.zeros(self.n_actions_max) for k in self.agent_keys} if self.use_rnn: rnn_hidden = self.init_hidden_item(i_env=i, rnn_hidden=rnn_hidden) if not test_mode: terminal_data = {'obs': next_obs_dict[i], 'actions_mean': actions_mean_next_dict[i], 'episode_step': info[i]['episode_step']} if self.use_global_state: terminal_data['state'] = next_state[i] if self.use_actions_mask: terminal_data['avail_actions'] = next_avail_actions[i] self.memory.finish_path(i, **terminal_data) episode_score = float(np.mean(itemgetter(*self.agent_keys)(info[i]["episode_score"]))) scores.append(episode_score) if test_mode: if best_score < episode_score: best_score = episode_score episode_videos = videos[i].copy() else: self.current_episode[i] += 1 if self.use_wandb: episode_info = { "Train-Results/Episode-Steps/env-%d" % i: info[i]["episode_step"], "Train-Results/Episode-Rewards/env-%d" % i: info[i]["episode_score"] } else: episode_info = { "Train-Results/Episode-Steps": {"env-%d" % i: info[i]["episode_step"]}, "Train-Results/Episode-Rewards": { "env-%d" % i: np.mean(itemgetter(*self.agent_keys)(info[i]["episode_score"]))} } self.current_step += info[i]["episode_step"] self.log_infos(episode_info, self.current_step) self._update_explore_factor() self.callback.on_train_episode_info(envs=self.train_envs, policy=self.policy, env_id=i, infos=info, use_wandb=self.use_wandb, current_step=self.current_step, current_episode=self.current_episode, n_episodes=n_episodes) current_step += num_envs if test_mode: if self.config.render_mode == "rgb_array" and self.render: # time, height, width, channel -> time, channel, height, width videos_info = {"Videos_Test": np.array([episode_videos], dtype=np.uint8).transpose((0, 1, 4, 2, 3))} self.log_videos(info=videos_info, fps=self.fps, x_index=self.current_step) test_info = { "Test-Results/Episode-Rewards": np.mean(scores), "Test-Results/Episode-Rewards-Std": np.std(scores), } self.log_infos(test_info, self.current_step) self.callback.on_test_end(envs=envs, policy=self.policy, current_train_step=self.current_step, current_step=current_step, current_episode=current_episode, scores=scores, best_score=best_score) if close_envs: envs.close() return scores