Source code for xuance.engine.run_competition

import os
from argparse import Namespace
from copy import deepcopy
import numpy as np
from tqdm import tqdm
from operator import itemgetter
from xuance.environment import make_envs
from xuance.environment.vector_envs import combine_actions


[docs] class RunnerCompetition(object): def __init__(self, configs): self.configs = configs # build environments self.envs = make_envs(self.configs[0]) self.n_envs = self.envs.num_envs self.current_step = 0 self.envs.reset() self.groups_info = self.envs.groups_info self.groups = self.groups_info['agent_groups'] self.num_groups = self.groups_info['num_groups'] self.obs_space_groups = self.groups_info['observation_space_groups'] self.act_space_groups = self.groups_info['action_space_groups'] assert len(configs) == self.num_groups, "Number of groups must be equal to the number of methods." self.agents = [] for group in range(self.num_groups): _env = Namespace(num_agents=len(self.groups[group]), num_envs=self.envs.num_envs, agents=self.groups[group], state_space=self.envs.state_space, observation_space=self.obs_space_groups[group], action_space=self.act_space_groups[group], max_episode_steps=self.envs.max_episode_steps) from xuance.torch.agents import REGISTRY_Agents self.agents.append(REGISTRY_Agents[self.configs[group].agent](self.configs[group], _env)) self.distributed_training = self.agents[0].distributed_training self.use_actions_mask = self.agents[0].use_actions_mask self.use_global_state = self.agents[0].use_global_state self.use_rnn = self.agents[0].use_rnn self.use_wandb = self.agents[0].use_wandb self.rank = 0 if self.distributed_training: self.rank = int(os.environ['RANK'])
[docs] def rprint(self, info: str): if self.rank == 0: print(info)
[docs] def run(self): if self.configs[0].test_mode: def env_fn(): config_test = deepcopy(self.configs[0]) config_test.parallels = 1 config_test.render = True return make_envs(config_test) for agent in self.agents: agent.render = True agent.load_model(agent.model_dir_load) scores = self.test(env_fn, self.configs[0].test_episode) print(f"Mean Score: {scores}, Std: {scores}") print("Finish testing.") else: n_train_steps = self.configs[0].running_steps // self.n_envs self.train(n_train_steps) print("Finish training.") for agent in self.agents: agent.save_model("final_train_model.pth") for agent in self.agents: agent.finish() self.envs.close()
[docs] def benchmark(self): def env_fn(): config_test = deepcopy(self.configs[0]) config_test.parallels = 1 # config_test.test_episode return make_envs(config_test) train_steps = self.configs[0].running_steps // self.n_envs eval_interval = self.configs[0].eval_interval // self.n_envs test_episode = self.configs[0].test_episode num_epoch = int(train_steps / eval_interval) test_scores = self.test(env_fn, test_episode) if self.rank == 0 else 0.0 best_scores_info = [{"mean": np.mean(test_scores[i]), "std": np.std(test_scores[i]), "step": self.agents[i].current_step} for i in range(self.num_groups)] for i_epoch in range(num_epoch): print("Epoch: %d/%d:" % (i_epoch, num_epoch)) self.train(eval_interval) if self.rank == 0: test_scores = self.test(env_fn, test_episode) for i in range(self.num_groups): if np.mean(test_scores[i]) > best_scores_info[i]["mean"]: best_scores_info[i] = {"mean": np.mean(test_scores[i]), "std": np.std(test_scores[i]), "step": self.agents[i].current_step} # save best model self.agents[i].save_model(model_name="best_model.pth") # end benchmarking best_scores = [score["mean"] for score in best_scores_info] std_list = [score["std"] for score in best_scores_info] print(f"The training for {self.configs[0].env_name}/{self.configs[0].env_id} is finished.") print("Algorithms: ", [config.agent for config in self.configs]) print("Best Model Score: ,", best_scores, "Std: ", std_list) for i in range(self.num_groups): self.agents[i].finish() self.envs.close()
[docs] def train(self, n_steps): """ Train the model for numerous steps. Args: n_steps (int): Number of steps to train the model: """ if self.use_rnn: with tqdm(total=n_steps) as process_bar: step_start, step_last = deepcopy(self.current_step), deepcopy(self.current_step) n_steps_all = n_steps * self.n_envs while step_last - step_start < n_steps_all: self.run_episodes(None, n_episodes=self.n_envs, test_mode=False) for agent in self.agents: if self.current_step >= agent.start_training: train_info = agent.train_epochs(n_epochs=agent.n_epochs) agent.log_infos(train_info, self.current_step) process_bar.update((self.current_step - step_last) // self.n_envs) step_last = deepcopy(self.current_step) process_bar.update(n_steps - process_bar.last_print_n) return obs_dict = self.envs.buf_obs avail_actions = self.envs.buf_avail_actions if self.use_actions_mask else None state = self.envs.buf_state.copy() if self.use_global_state else None for _ in tqdm(range(n_steps)): step_info = {} policy_out_list = [agent.action(obs_dict=obs_dict, state=state, avail_actions_dict=avail_actions, test_mode=False) for agent in self.agents] actions_execute = combine_actions(policy_out_list, self.n_envs) next_obs_dict, rewards_dict, terminated_dict, truncated, info = self.envs.step(actions_execute) next_state = self.envs.buf_state.copy() if self.use_global_state else None next_avail_actions = self.envs.buf_avail_actions.copy() if self.use_actions_mask else None for agent in self.agents: agent.store_experience(obs_dict, avail_actions, actions_execute, next_obs_dict, next_avail_actions, rewards_dict, terminated_dict, info, **{'state': state, 'next_state': next_state}) if self.current_step >= agent.start_training and self.current_step % agent.training_frequency == 0: train_info = agent.train_epochs(n_epochs=agent.n_epochs) agent.log_infos(train_info, self.current_step) obs_dict = next_obs_dict if self.use_global_state: state = deepcopy(next_state) if self.use_actions_mask: avail_actions = deepcopy(next_avail_actions) for i in range(self.n_envs): if all(terminated_dict[i].values()) or truncated[i]: obs_dict[i] = info[i]["reset_obs"] self.envs.buf_obs[i] = info[i]["reset_obs"] if self.use_global_state: state = info[i]["reset_state"] self.envs.buf_state[i] = info[i]["reset_state"] if self.use_actions_mask: avail_actions[i] = info[i]["reset_avail_actions"] self.envs.buf_avail_actions[i] = info[i]["reset_avail_actions"] for agent in self.agents: episode_score = np.mean(itemgetter(*agent.agent_keys)(info[i]["episode_score"])) if self.use_wandb: step_info[f"Train-Results/Episode-Steps/rank_{self.rank}/env-%d" % i] = info[i][ "episode_step"] step_info[f"Train-Results/Episode-Rewards/rank_{self.rank}/env-%d" % i] = episode_score else: step_info[f"Train-Results/Episode-Steps/rank_{self.rank}"] = { "env-%d" % i: info[i]["episode_step"]} step_info[f"Train-Results/Episode-Rewards/rank_{self.rank}"] = {"env-%d" % i: episode_score} agent.log_infos(step_info, self.current_step) self.current_step += self.n_envs for agent in self.agents: agent.current_step += self.n_envs if not agent.on_policy: agent._update_explore_factor()
[docs] def run_episodes(self, env_fn=None, n_episodes: int = 1, test_mode: bool = False): """ Run some episodes when use RNN. Parameters: env_fn: The function that can make some testing environments. n_episodes (int): Number of episodes. test_mode (bool): Whether to test the model. Returns: Scores: The episode scores. """ envs = self.envs if env_fn is None else env_fn() num_envs = envs.num_envs videos = [[] for _ in range(num_envs)] episode_videos = [[] for _ in range(self.num_groups)] episode_count = 0 scores = [[0.0 for _ in range(num_envs)] for _ in range(self.num_groups)] best_score = [-np.inf for _ in range(self.num_groups)] obs_dict, info = envs.reset() state = envs.buf_state.copy() if self.use_global_state else None avail_actions = envs.buf_avail_actions if self.use_actions_mask else None if test_mode: for config in self.configs: if config.render_mode == "rgb_array" and config.render: images = envs.render(config.render_mode) for idx, img in enumerate(images): videos[idx].append(img) else: if self.use_rnn: for agent in self.agents: agent.memory.clear_episodes() rnn_hidden = [agent.init_rnn_hidden(num_envs) for agent in self.agents] while episode_count < n_episodes: step_info = {} policy_out_list = [agent.action(obs_dict=obs_dict, state=state, avail_actions_dict=avail_actions, rnn_hidden=rnn_hidden[i_agt], test_mode=test_mode) for i_agt, agent in enumerate(self.agents)] actions_execute = combine_actions(policy_out_list, num_envs) rnn_hidden = [policy_out['hidden_state'] for policy_out in policy_out_list] next_obs_dict, rewards_dict, terminated_dict, truncated, info = envs.step(actions_execute) next_state = envs.buf_state.copy() if self.use_global_state else None next_avail_actions = envs.buf_avail_actions if self.use_actions_mask else None if test_mode: for config in self.configs: if config.render_mode == "rgb_array" and config.render: images = envs.render(config.render_mode) for idx, img in enumerate(images): videos[idx].append(img) else: for agent in self.agents: agent.store_experience(obs_dict, avail_actions, actions_execute, next_obs_dict, next_avail_actions, rewards_dict, terminated_dict, info, **{'state': state, 'next_state': next_state}) obs_dict = deepcopy(next_obs_dict) if self.use_global_state: state = deepcopy(next_state) if self.use_actions_mask: avail_actions = deepcopy(next_avail_actions) for i in range(num_envs): if all(terminated_dict[i].values()) or truncated[i]: episode_count += 1 obs_dict[i] = info[i]["reset_obs"] envs.buf_obs[i] = info[i]["reset_obs"] if self.use_global_state: state = info[i]["reset_state"] self.envs.buf_state[i] = info[i]["reset_state"] if self.use_actions_mask: avail_actions[i] = info[i]["reset_avail_actions"] envs.buf_avail_actions[i] = info[i]["reset_avail_actions"] if self.use_rnn: rnn_hidden = [agent.init_hidden_item(i_env=i, rnn_hidden=rnn_hidden) for agent in self.agents] if not test_mode: terminal_data = {'obs': next_obs_dict[i], 'episode_step': info[i]['episode_step']} if self.use_global_state: terminal_data['state'] = next_state[i] if self.use_actions_mask: terminal_data['avail_actions'] = next_avail_actions[i] for agent in self.agents: agent.memory.finish_path(i, **terminal_data) if not test_mode: self.current_step += info[i]["episode_step"] for agent in self.agents: agent.current_step += info[i]["episode_step"] for i_group in range(self.num_groups): episode_score = float(np.mean(itemgetter(*self.groups[i_group])(info[i]["episode_score"]))) scores[i_group].append(episode_score) if test_mode: if best_score[i_group] < episode_score: best_score[i_group] = episode_score episode_videos[i_group] = videos[i].copy() if self.configs[i_group].test_mode: print("Episode: %d, Score: %.2f" % (episode_count, episode_score)) else: for agent in self.agents: episode_score = np.mean(itemgetter(*agent.agent_keys)(info[i]["episode_score"])) if self.use_wandb: step_info["Train-Results/Episode-Steps/env-%d" % i] = info[i]["episode_step"] step_info["Train-Results/Episode-Rewards/env-%d" % i] = episode_score else: step_info["Train-Results/Episode-Steps"] = {"env-%d" % i: info[i]["episode_step"]} step_info["Train-Results/Episode-Rewards"] = {"env-%d" % i: episode_score} agent.log_infos(step_info, self.current_step) if not agent.on_policy: agent._update_explore_factor() if test_mode: for i_group in range(self.num_groups): config = self.configs[i_group] if config.render_mode == "rgb_array" and config.render: # time, height, width, channel -> time, channel, height, width videos_info = {"Videos_Test": np.array([episode_videos[i_group]], dtype=np.uint8).transpose((0, 1, 4, 2, 3))} self.agents[i_group].log_videos(info=videos_info, fps=config.fps, x_index=self.current_step) if self.configs[0].test_mode: print("Best Score: ", best_score) for i_group in range(self.num_groups): test_info = { "Test-Results/Episode-Rewards": np.mean(scores[i_group]), "Test-Results/Episode-Rewards-Std": np.std(scores[i_group]), } self.agents[i_group].log_infos(test_info, self.current_step) if env_fn is not None: envs.close() return scores
[docs] def test(self, env_fn, n_episodes): """ Test the model for some episodes. Parameters: env_fn: The function that can make some testing environments. n_episodes (int): Number of episodes to test. Returns: scores (List(float)): A list of cumulative rewards for each episode. """ scores = self.run_episodes(env_fn=env_fn, n_episodes=n_episodes, test_mode=True) return scores