from __future__ import annotations
import os
import json
import time
import csv
import numpy as np
from copy import deepcopy
from argparse import Namespace
from datetime import datetime
from xuance.common import Optional, create_directory
from xuance.environment import DummyVecMultiAgentEnv, SubprocVecMultiAgentEnv, make_envs
from xuance.engine import RunnerBase
[docs]
class RunnerMARL(RunnerBase):
def __init__(self,
config: Namespace,
envs: Optional[DummyVecMultiAgentEnv | SubprocVecMultiAgentEnv] = None,
agent=None,
manage_resources: bool = None):
# Store configuration
self.config = config
self.env_id = self.config.env_id
super(RunnerMARL, self).__init__(self.config, envs, agent, manage_resources)
# Build agent if not injected externally
if getattr(self.config, 'dl_toolbox', 'torch'):
from xuance.torch.agents import REGISTRY_Agents
from xuance.torch.utils import collect_device_info
elif getattr(self.config, 'dl_toolbox', 'tensorflow'):
from xuance.tensorflow.agents import REGISTRY_Agents
from xuance.tensorflow.utils import collect_device_info
elif getattr(self.config, 'dl_toolbox', 'mindspore'):
from xuance.mindspore.agents import REGISTRY_Agents
from xuance.mindspore.utils import collect_device_info
else:
raise NotImplementedError
self.collect_device_info = collect_device_info
self.agent = REGISTRY_Agents[self.config.agent](self.config, self.envs) if agent is None else agent
# Distributed training setup (rank-aware behavior)
if self.agent.distributed_training:
self.rank = int(os.environ['RANK'])
def _run_train(self, **kwargs):
n_train_steps = max(1, self.config.running_steps // self.n_envs)
self.agent.train(n_train_steps)
self.rprint("Finish training.")
self.agent.save_model(model_name="final_train_model.pth")
def _run_test(self, **kwargs):
config_test = deepcopy(self.config)
config_test.parallels = kwargs.get("n_envs", 1)
config_test.render = kwargs.get('render', True)
config_test.render_mode = kwargs.get('render_mode', getattr(self.config, 'render_mode', 'human'))
model_path = kwargs.get('model_path', self.agent.model_dir_load)
test_episodes = kwargs.get('test_episodes', self.config.test_episode)
test_envs = make_envs(config_test)
if self.rank == 0:
self.agent.load_model(model_path)
scores = self.agent.test(test_episodes=test_episodes, test_envs=test_envs, close_envs=True)
print("\n---------------------Testing Results--------------------")
print("Test Episode Scores: ", scores)
print(f"Mean Score: {np.mean(scores)}, Std: {np.std(scores)}")
print("Best Score: %.2f" % max(scores))
print("Finish testing.")
def _run_benchmark(self, **kwargs):
running_steps = kwargs.get('running_steps', self.config.running_steps)
eval_interval = kwargs.get('eval_interval', self.config.eval_interval)
test_episodes = kwargs.get('test_episodes', self.config.test_episode)
benchmark_result_path = kwargs.get('benchmark_result_path', self.config.result_dir)
best_model_path = os.path.join(os.getcwd(), benchmark_result_path, "best_model")
# Prepare directory for storing benchmark results.
benchmark_result_path = os.path.join(os.getcwd(), benchmark_result_path)
create_directory(benchmark_result_path)
# Create test_scores.csv file to store testing scores.
test_scores_csv = os.path.join(benchmark_result_path, "test_scores.csv")
learning_curve_csv = os.path.join(benchmark_result_path, "learning_curve.csv")
if self.rank == 0:
with open(test_scores_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
header = ["step"] + [f"return_episode_{i}" for i in range(test_episodes)]
writer.writerow(header)
with open(learning_curve_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["step", "avg_return"])
meta_data = self.agent.meta_data
meta_data['benchmark'] = {
"running_steps": running_steps,
"eval_interval": eval_interval,
"test_episodes": test_episodes,
}
meta_data["system_info"] = self.collect_device_info(rank=int(getattr(self, "rank", 0)), agent=self.agent)
config_dict = vars(self.agent.config).copy()
config_dict.pop("observation_space", None)
config_dict.pop("action_space", None)
# Prepare testing environments.
config_test = deepcopy(self.config)
config_test.parallels = 1 # config_test.test_episode
test_envs = make_envs(config_test)
train_steps = max(1, running_steps // self.n_envs)
eval_interval = max(1, eval_interval // self.n_envs)
num_epoch = train_steps // eval_interval
# Start benchmarking...
start_time = time.time()
start_time_iso = datetime.now().astimezone().isoformat()
best_model_time_iso = start_time_iso
if self.rank == 0:
test_scores = self.agent.test(test_episodes=test_episodes, test_envs=test_envs, close_envs=False)
with open(test_scores_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([int(self.agent.current_step)] + test_scores)
with open(learning_curve_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([int(self.agent.current_step), np.mean(test_scores)])
else:
test_scores = 0.0
best_scores_info = {"mean": np.mean(test_scores),
"std": np.std(test_scores),
"step": self.agent.current_step}
for i_epoch in range(num_epoch):
self.rprint("Epoch: %d/%d:" % (i_epoch, num_epoch))
self.agent.train(train_steps=eval_interval)
if self.rank == 0:
test_scores = self.agent.test(test_episodes=test_episodes,
test_envs=test_envs,
close_envs=False)
with open(test_scores_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([int(self.agent.current_step)] + test_scores)
with open(learning_curve_csv, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow([int(self.agent.current_step), np.mean(test_scores)])
if np.mean(test_scores) > best_scores_info["mean"]:
best_scores_info = {"mean": np.mean(test_scores),
"std": np.std(test_scores),
"step": self.agent.current_step}
# save best model
self.agent.save_model(model_name="best_model.pth", model_path=best_model_path)
best_model_time_iso = datetime.now().astimezone().isoformat()
# End benchmarking.
# Save best model information.
end_time = time.time()
end_time_iso = datetime.now().astimezone().isoformat()
timestamps = {
"start_time": start_time_iso,
"best_model_time": best_model_time_iso,
"end_time": end_time_iso,
"elapsed_seconds": round(end_time - start_time, 3)
}
meta_data["timestamps"] = timestamps
if self.rank == 0:
with open(os.path.join(benchmark_result_path, "meta_data.json"), "w", encoding='utf-8') as f:
json.dump(meta_data, f, indent=2, ensure_ascii=False)
with open(os.path.join(benchmark_result_path, "config.json"), "w", encoding='utf-8') as f:
json.dump(config_dict, f, indent=2, ensure_ascii=False)
with open(os.path.join(benchmark_result_path, "best_model_info.json"), "w", encoding='utf-8') as f:
json.dump(best_scores_info, f, indent=2, ensure_ascii=False)
self.agent.save_model(model_name="final_train_model.pth")
test_envs.close()
self.rprint("Best Model Score: %.2f, std=%.2f. "
"Best Step: %d" % (best_scores_info["mean"], best_scores_info["std"],
best_scores_info["step"]))