from __future__ import annotations
import os
import json
import time
import csv
import numpy as np
from argparse import Namespace
from datetime import datetime
from xuance.common import Optional, create_directory
from xuance.environment import DummyVecMultiAgentEnv, SubprocVecMultiAgentEnv
from xuance.engine import RunnerBase
[docs]
class RunnerSC2(RunnerBase):
def __init__(self,
config: Namespace,
envs: Optional[DummyVecMultiAgentEnv | SubprocVecMultiAgentEnv] = None,
agent=None,
manage_resources: bool = None):
# Store configuration
self.config = config
self.env_id = self.config.env_id
self.running_steps = config.running_steps
super(RunnerSC2, self).__init__(self.config, envs, agent, manage_resources)
if getattr(self.config, 'dl_toolbox', 'torch'):
from xuance.torch.agents import REGISTRY_Agents
elif getattr(self.config, 'dl_toolbox', 'tensorflow'):
from xuance.tensorflow.agents import REGISTRY_Agents
elif getattr(self.config, 'dl_toolbox', 'mindspore'):
from xuance.mindspore.agents import REGISTRY_Agents
else:
raise NotImplementedError
self.config.n_agents = self.envs.num_agents
self.agent = REGISTRY_Agents[self.config.agent](self.config, self.envs) if agent is None else agent
self.num_agents, self.num_enemies = self.get_agent_num()
# Distributed training setup (rank-aware behavior)
if self.agent.distributed_training:
self.rank = int(os.environ['RANK'])
[docs]
def get_agent_num(self):
return self.envs.num_agents, self.envs.num_enemies
[docs]
def get_battles_info(self):
battles_game, battles_won = self.envs.battles_game.sum(), self.envs.battles_won.sum()
dead_allies, dead_enemies = self.envs.dead_allies_count.sum(), self.envs.dead_enemies_count.sum()
return battles_game, battles_won, dead_allies, dead_enemies
[docs]
def get_battles_result(self, last_battles_info):
battles_game, battles_won, dead_allies, dead_enemies = list(last_battles_info)
incre_battles_game = float(self.envs.battles_game.sum() - battles_game)
incre_battles_won = float(self.envs.battles_won.sum() - battles_won)
win_rate = incre_battles_won / incre_battles_game if incre_battles_game > 0 else 0.0
allies_count, enemies_count = incre_battles_game * self.num_agents, incre_battles_game * self.num_enemies
incre_allies = float(self.envs.dead_allies_count.sum() - dead_allies)
incre_enemies = float(self.envs.dead_enemies_count.sum() - dead_enemies)
allies_dead_ratio = incre_allies / allies_count if allies_count > 0 else 0.0
enemies_dead_ratio = incre_enemies / enemies_count if enemies_count > 0 else 0.0
return win_rate, allies_dead_ratio, enemies_dead_ratio
[docs]
def test_episodes(self, test_T, n_test_runs):
test_scores = np.zeros(n_test_runs, np.float32)
last_battles_info = self.get_battles_info()
for i_test in range(n_test_runs):
running_scores = self.agent.run_episodes(
n_episodes=self.n_envs, # Number of testing episodes
run_envs=None, # The running envs. If None, `self.agent.train_envs` is used.
test_mode=True, # Test mode is on.
close_envs=False # Don't close the testing envs (self.agent.train_envs).
)
test_scores[i_test] = np.mean(running_scores)
win_rate, allies_dead_ratio, enemies_dead_ratio = self.get_battles_result(last_battles_info)
mean_test_score = test_scores.mean()
results_info = {"Test-Results/Win-Rate": win_rate,
"Test-Results/Allies-Dead-Ratio": allies_dead_ratio,
"Test-Results/Enemies-Dead-Ratio": enemies_dead_ratio}
self.agent.log_infos(results_info, test_T)
return test_scores, mean_test_score, test_scores.std(), win_rate
def _run_train(self, **kwargs):
eval_interval = self.config.eval_interval
last_test_T = 0
episode_scores = []
agent_info = f"Algo: {self.config.agent}, Map: {self.config.env_id}, seed: {self.config.seed}, "
self.rprint(f"Steps: {self.agent.current_step} / {self.running_steps}: ")
self.rprint(agent_info, "Win rate: %-, Mean score: -.")
last_battles_info = self.get_battles_info()
time_start = time.time()
while self.agent.current_step <= self.running_steps:
score = self.agent.run_episodes(None, n_episodes=self.n_envs, test_mode=False)
if self.agent.current_step >= self.agent.start_training:
train_info = self.agent.train_epochs(n_epochs=1)
self.agent.log_infos(train_info, self.agent.current_step)
episode_scores.append(np.mean(score))
if self.rank == 0:
if (self.agent.current_step - last_test_T) / eval_interval >= 1.0:
last_test_T += eval_interval
# log train results before testing.
train_win_rate, allies_dead_ratio, enemies_dead_ratio = self.get_battles_result(last_battles_info)
results_info = {"Train-Results/Win-Rate": train_win_rate,
"Train-Results/Allies-Dead-Ratio": allies_dead_ratio,
"Train-Results/Enemies-Dead-Ratio": enemies_dead_ratio}
self.agent.log_infos(results_info, last_test_T)
last_battles_info = self.get_battles_info()
time_pass, time_left = self.time_estimate(time_start)
print(f"Steps: {self.agent.current_step} / {self.running_steps}: ")
print(agent_info, "Win rate: %.3f, Mean score: %.2f. " % (train_win_rate, np.mean(episode_scores)),
time_pass, time_left)
episode_scores = []
self.rprint("Finish training.")
self.agent.save_model("final_train_model.pth")
def _run_test(self, **kwargs):
model_path = kwargs.get('model_path', self.agent.model_dir_load)
test_episodes = kwargs.get('test_episodes', self.config.test_episode)
if self.rank == 0:
self.agent.load_model(model_path)
_, test_score_mean, test_score_std, test_win_rate = self.test_episodes(0, test_episodes)
agent_info = f"Algo: {self.config.agent}, Map: {self.config.env_id}, seed: {self.config.seed}, "
print(agent_info, "Win rate: %.3f, Mean score: %.2f. " % (test_win_rate, test_score_mean))
print("Finish testing.")
def _run_benchmark(self, **kwargs):
running_steps = kwargs.get('running_steps', self.config.running_steps)
eval_interval = kwargs.get('eval_interval', self.config.eval_interval)
test_episodes = kwargs.get('test_episodes', self.config.test_episode)
n_test_runs = test_episodes // self.n_envs
benchmark_result_path = kwargs.get('benchmark_result_path', self.config.result_dir)
best_model_path = os.path.join(os.getcwd(), benchmark_result_path, "best_model")
# Prepare directory for storing benchmark results.
benchmark_result_path = os.path.join(os.getcwd(), benchmark_result_path)
create_directory(benchmark_result_path)
# Create test_scores.csv file to store testing scores.
test_scores_csv = os.path.join(benchmark_result_path, "test_scores.csv")
learning_curve_csv = os.path.join(benchmark_result_path, "learning_curve.csv")
if self.rank == 0:
with open(test_scores_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
header = ["step"] + [f"return_episode_{i}" for i in range(self.config.test_episode)]
writer.writerow(header)
with open(learning_curve_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["step", "avg_return"])
meta_data = self.agent.meta_data
meta_data['benchmark'] = {
"running_steps": running_steps,
"eval_interval": eval_interval,
"test_episodes": test_episodes,
}
meta_data["system_info"] = self.collect_device_info()
config_dict = vars(self.agent.config).copy()
config_dict.pop("observation_space", None)
config_dict.pop("action_space", None)
# Start benchmarking...
start_time = time.time()
start_time_iso = datetime.now().astimezone().isoformat()
best_model_time_iso = start_time_iso
if self.rank == 0:
# test the model at step 0
test_scores, test_score_mean, test_score_std, test_win_rate = self.test_episodes(last_test_T, n_test_runs)
with open(test_scores_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([int(self.agent.current_step)] + test_scores)
with open(learning_curve_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([int(self.agent.current_step), np.mean(test_scores)])
best_win_rate = test_win_rate
else:
best_win_rate = 0
test_score_std = 0
last_test_T = 0
agent_info = f"Algo: {self.config.agent}, Map: {self.config.env_id}, seed: {self.config.seed}, "
self.rprint(f"Steps: {self.agent.current_step} / {self.running_steps}: ")
self.rprint(agent_info, "Win rate: %.3f, Mean score: %.2f. " % (test_win_rate, test_score_mean))
last_battles_info = self.get_battles_info()
best_scores_info = {"mean": test_score_mean,
"std": test_score_std,
"step": self.agent.current_step}
while self.agent.current_step <= self.running_steps:
# train
self.agent.run_episodes(
n_episodes=self.n_envs, # Number of testing episodes
run_envs=None, # The running envs. If None, `self.agent.train_envs` is used.
test_mode=False, # Test mode is on.
close_envs=False # Don't close the testing envs (self.agent.train_envs).
)
if self.agent.current_step >= self.agent.start_training:
train_info = self.agent.train_epochs(n_epochs=self.n_envs)
self.agent.log_infos(train_info, self.agent.current_step)
# test
if self.rank == 0:
if (self.agent.current_step - last_test_T) / eval_interval >= 1.0:
last_test_T += eval_interval
# log train results before testing.
train_win_rate, allies_dead_ratio, enemies_dead_ratio = self.get_battles_result(last_battles_info)
results_info = {"Train-Results/Win-Rate": train_win_rate,
"Train-Results/Allies-Dead-Ratio": allies_dead_ratio,
"Train-Results/Enemies-Dead-Ratio": enemies_dead_ratio}
self.agent.log_infos(results_info, last_test_T)
# test the model
test_scores, test_score_mean, test_score_std, test_win_rate = self.test_episodes(last_test_T,
n_test_runs)
if test_score_mean > best_scores_info["mean"]:
best_scores_info = {"mean": test_score_mean,
"std": test_score_std,
"step": self.agent.current_step}
if test_win_rate > best_win_rate:
best_win_rate = test_win_rate
self.agent.save_model("best_model.pth", model_path=best_model_path) # save best model
best_model_time_iso = datetime.now().astimezone().isoformat()
last_battles_info = self.get_battles_info()
# Estimate the physic running time
time_pass, time_left = self.time_estimate(start_time)
print(f"Steps: {self.agent.current_step} / {self.running_steps}: ")
print(agent_info, "Win rate: %.3f, Mean score: %.2f. " % (test_win_rate, test_score_mean),
time_pass, time_left)
# End benchmarking.
# Save best model information.
end_time = time.time()
end_time_iso = datetime.now().astimezone().isoformat()
timestamps = {
"start_time": start_time_iso,
"best_model_time": best_model_time_iso,
"end_time": end_time_iso,
"elapsed_seconds": round(end_time - start_time, 3)
}
meta_data["timestamps"] = timestamps
if self.rank == 0:
with open(os.path.join(benchmark_result_path, "meta_data.json"), "w", encoding='utf-8') as f:
json.dump(meta_data, f, indent=2, ensure_ascii=False)
with open(os.path.join(benchmark_result_path, "config.json"), "w", encoding='utf-8') as f:
json.dump(config_dict, f, indent=2, ensure_ascii=False)
with open(os.path.join(benchmark_result_path, "best_model_info.json"), "w", encoding='utf-8') as f:
json.dump(best_scores_info, f, indent=2, ensure_ascii=False)
self.rprint("Finish benchmarking.")
self.rprint("Best Score: %.4f, Std: %.4f. "
"Best Step: %d" % (best_scores_info["mean"], best_scores_info["std"],
best_scores_info['step']))
self.rprint("Best Win Rate: {}%".format(best_win_rate * 100))
[docs]
def time_estimate(self, start):
current_step = self.agent.current_step
time_pass = int(time.time() - start)
time_left = int((self.running_steps - current_step) / current_step * time_pass)
if time_left < 0:
time_left = 0
hours_pass, hours_left = time_pass // 3600, time_left // 3600
min_pass, min_left = np.mod(time_pass, 3600) // 60, np.mod(time_left, 3600) // 60
sec_pass, sec_left = np.mod(np.mod(time_pass, 3600), 60), np.mod(np.mod(time_left, 3600), 60)
INFO_time_pass = f"Time pass: {hours_pass}h{min_pass}m{sec_pass}s,"
INFO_time_left = f"Time left: {hours_left}h{min_left}m{sec_left}s"
return INFO_time_pass, INFO_time_left