Source code for xuance.common.memory_tools_marl

import numpy as np
from abc import ABC, abstractmethod
from gymnasium.spaces import Space
from typing import List, Dict, Optional
from xuance.common import create_memory
from xuance.environment.utils import space2shape


[docs] class BaseBuffer(ABC): """ Basic buffer for MARL algorithms. Args: agent_keys (List[str]): Keys that identify each agent. state_space (Dict[str, Space]): Global state space, type: Discrete, Box. observation_space (Dict[str, Dict[str, Space]]): Observation space for one agent. action_space (Dict[str, Dict[str, Space]]): Action space for one agent. n_envs (int): Number of parallel environments. buffer_size (int): Buffer size of total experience data. """ def __init__( self, agent_keys: List[str], state_space: Dict[str, Space] = None, observation_space: Dict[str, Dict[str, Space]] = None, action_space: Dict[str, Dict[str, Space]] = None, num_envs: int = 1, buffer_size: int = 1, ): assert buffer_size % num_envs == 0, "buffer_size must be divisible by the number of envs (parallels)" self.agent_keys = agent_keys self.state_space = state_space self.observation_space = observation_space self.action_space = action_space self.n_envs = num_envs self.buffer_size = buffer_size self.n_size = self.buffer_size // self.n_envs # Buffer size per env self.ptr = 0 # last data pointer self.size = 0 # current buffer size per environment. @property def full(self): return self.size >= self.n_size
[docs] @abstractmethod def store(self, *args, **kwargs): raise NotImplementedError
[docs] @abstractmethod def clear(self, *args): raise NotImplementedError
[docs] @abstractmethod def sample(self, *args): raise NotImplementedError
[docs] @abstractmethod def finish_path(self, *args, **kwargs): raise NotImplementedError
[docs] class MARL_OnPolicyBuffer(BaseBuffer): """ Replay buffer for on-policy MARL algorithms. Args: agent_keys (List[str]): Keys that identify each agent. state_space (Dict[str, Space]): Global state space, type: Discrete, Box. obs_space (Dict[str, Dict[str, Space]]): Observation space for one agent (suppose same obs space for group agents). act_space (Dict[str, Dict[str, Space]]): Action space for one agent (suppose same actions space for group agents). n_envs (int): Number of parallel environments. buffer_size (int): Buffer size of total experience data. use_gae (bool): Whether to use GAE trick. use_advnorm (bool): Whether to use Advantage normalization trick. gamma (float): Discount factor. gae_lam (float): gae lambda. **kwargs: Other arguments. Examples: >>> state_space=None >>> obs_space={'agent_0': Box(-inf, inf, (18,), float32), ... 'agent_1': Box(-inf, inf, (18,), float32), ... 'agent_2': Box(-inf, inf, (18,), float32)}, >>> act_space={'agent_0': Box(0.0, 1.0, (5,), float32), ... 'agent_1': Box(0.0, 1.0, (5,), float32), ... 'agent_2': Box(0.0, 1.0, (5,), float32)}, >>> n_envs=16, >>> buffer_size=1600, >>> agent_keys=['agent_0', 'agent_1', 'agent_2'], >>> memory = MARL_OffPolicyBuffer(agent_keys=agent_keys, state_space=state_space, obs_space=obs_space, ... act_space=act_space, n_envs=n_envs, buffer_size=buffer_size, ... use_gae=False, use_advnorm=False, gamma=0.99, gae_lam=0.95) """ def __init__(self, agent_keys: List[str], state_space: Dict[str, Space] = None, obs_space: Dict[str, Dict[str, Space]] = None, act_space: Dict[str, Dict[str, Space]] = None, n_envs: int = 1, buffer_size: int = 1, use_gae: Optional[bool] = False, use_advnorm: Optional[bool] = False, gamma: Optional[float] = None, gae_lam: Optional[float] = None, **kwargs): super(MARL_OnPolicyBuffer, self).__init__(agent_keys, state_space, obs_space, act_space, n_envs, buffer_size) self.n_size = buffer_size // self.n_envs self.store_global_state = False if self.state_space is None else True self.use_actions_mask = kwargs['use_actions_mask'] if 'use_actions_mask' in kwargs else False self.avail_actions_shape = kwargs['avail_actions_shape'] if 'avail_actions_shape' in kwargs else None self.use_gae = use_gae self.use_advantage_norm = use_advnorm self.gamma, self.gae_lambda = gamma, gae_lam # prepare an empty buffer to store data self.data, self.start_ids = {}, None self.reward_space = {key: () for key in self.agent_keys} self.returns = {key: () for key in self.agent_keys} self.values = {key: () for key in self.agent_keys} self.log_pi_old = {key: () for key in self.agent_keys} self.advantages = {key: () for key in self.agent_keys} self.terminal_space = {key: () for key in self.agent_keys} self.agent_mask_space = {key: () for key in self.agent_keys} self.clear() self.data_keys = self.data.keys()
[docs] def clear(self): """ Clears the memory data in the replay buffer. Examples: An example shows the data shape (``n_env=16``, ``buffer_size=1600``, ``agent_keys=['agent_0', 'agent_1', 'agent_2']``): .. code-block:: python self.data = { 'obs': { 'agent_0': shape=[16, 100, 18], 'agent_1': shape=[16, 100, 18], 'agent_2': shape=[16, 100, 18], }, # dim_obs: 18 'actions': { 'agent_0': shape=[16, 100, 5], 'agent_1': shape=[16, 100, 5], 'agent_2': shape=[16, 100, 5], }, # dim_act: 5 ... } """ self.data = { 'obs': create_memory(space2shape(self.observation_space), self.n_envs, self.n_size), 'actions': create_memory(space2shape(self.action_space), self.n_envs, self.n_size), 'rewards': create_memory(self.reward_space, self.n_envs, self.n_size), 'returns': create_memory(self.reward_space, self.n_envs, self.n_size), 'values': create_memory(self.reward_space, self.n_envs, self.n_size), 'log_pi_old': create_memory(self.reward_space, self.n_envs, self.n_size), 'advantages': create_memory(self.reward_space, self.n_envs, self.n_size), 'terminals': create_memory(self.terminal_space, self.n_envs, self.n_size, np.bool_), 'agent_mask': create_memory(self.agent_mask_space, self.n_envs, self.n_size, np.bool_), } if self.store_global_state: self.data.update({ 'state': create_memory(space2shape(self.state_space), self.n_envs, self.n_size) }) if self.use_actions_mask: self.data.update({ "avail_actions": create_memory(self.avail_actions_shape, self.n_envs, self.n_size, np.bool_), }) self.ptr, self.size = 0, 0 self.start_ids = np.zeros(self.n_envs, np.int64) # the start index of the last episode for each env.
[docs] def store(self, **step_data): """ Stores a step of data into the replay buffer. """ for data_key, data_value in step_data.items(): if data_key in ['state']: self.data[data_key][:, self.ptr] = data_value continue for agt_key in self.agent_keys: self.data[data_key][agt_key][:, self.ptr] = data_value[agt_key] self.ptr = (self.ptr + 1) % self.n_size self.size = np.min([self.size + 1, self.n_size])
[docs] def finish_path(self, i_env: Optional[int] = None, value_next: Optional[dict] = None, value_normalizer=None): """ Calculates and stores the returns and advantages when an episode is finished. Parameters: i_env (int): The index of environment. value_next (dict): The critic values of the terminal state. value_normalizer: The value normalizer method, default is None. """ if self.size == 0: return if self.full: path_slice = np.arange(self.start_ids[i_env], self.n_size).astype(np.int32) else: path_slice = np.arange(self.start_ids[i_env], self.ptr).astype(np.int32) # calculate advantages and returns use_value_norm = False if (value_normalizer is None) else True use_parameter_sharing = False if use_value_norm: if value_normalizer.keys() != set(self.agent_keys): use_parameter_sharing = True for key in self.agent_keys: rewards = self.data['rewards'][key][i_env, path_slice] vs = np.append(self.data['values'][key][i_env, path_slice], [value_next[key]], axis=0) dones = self.data['terminals'][key][i_env, path_slice] returns = np.zeros_like(rewards) last_gae_lam = 0 step_nums = len(path_slice) key_vn = self.agent_keys[0] if use_parameter_sharing else key if self.use_gae: for t in reversed(range(step_nums)): if use_value_norm: vs_t = value_normalizer[key_vn].denormalize(vs[t]).item() vs_next = value_normalizer[key_vn].denormalize(vs[t + 1]).item() else: vs_t, vs_next = vs[t], vs[t + 1] delta = rewards[t] + (1 - dones[t]) * self.gamma * vs_next - vs_t last_gae_lam = delta + (1 - dones[t]) * self.gamma * self.gae_lambda * last_gae_lam returns[t] = last_gae_lam + vs_t advantages = returns - value_normalizer[key_vn].denormalize( vs[:-1]) if use_value_norm else returns - vs[:-1] else: returns_ = np.append(returns, [value_next[key]], axis=0) for t in reversed(range(step_nums)): returns_[t] = rewards[t] + (1 - dones[t]) * self.gamma * returns_[t + 1] advantages = returns_ - value_normalizer[key_vn].denormalize(vs) if use_value_norm else returns_ - vs advantages = advantages[:-1] returns = returns_[:-1] self.data['returns'][key][i_env, path_slice] = returns self.data['advantages'][key][i_env, path_slice] = advantages self.start_ids[i_env] = self.ptr
[docs] def sample(self, indexes: Optional[np.ndarray] = None): """ Samples a batch of data from the replay buffer. Parameters: indexes (int): The indexes of the data in the buffer that will be sampled. Returns: samples_dict (dict): The sampled data. """ assert self.full, "Not enough transitions for on-policy buffer to random sample." samples_dict = {} env_choices, step_choices = divmod(indexes, self.n_size) for data_key in self.data_keys: if data_key == "advantages": adv_batch_dict = {} for agt_key in self.agent_keys: adv_batch = self.data[data_key][agt_key][env_choices, step_choices] if self.use_advantage_norm: adv_batch_dict[agt_key] = (adv_batch - np.mean(adv_batch)) / (np.std(adv_batch) + 1e-8) else: adv_batch_dict[agt_key] = adv_batch samples_dict[data_key] = adv_batch_dict elif data_key == "state": samples_dict[data_key] = self.data[data_key][env_choices, step_choices] else: samples_dict[data_key] = {k: self.data[data_key][k][env_choices, step_choices] for k in self.agent_keys} samples_dict['batch_size'] = len(indexes) return samples_dict
[docs] class MARL_OnPolicyBuffer_RNN(MARL_OnPolicyBuffer): """ Replay buffer for on-policy MARL algorithms with DRQN trick. Args: agent_keys (List[str]): Keys that identify each agent. state_space (Dict[str, Space]): Global state space, type: Discrete, Box. obs_space (Dict[str, Dict[str, Space]]): Observation space for one agent (suppose same obs space for group agents). act_space (Dict[str, Dict[str, Space]]): Action space for one agent (suppose same actions space for group agents). n_envs (int): Number of parallel environments. buffer_size (int): Buffer size of total experience data. max_episode_steps (int): The sequence length of each episode data. use_gae (bool): Whether to use GAE trick. use_advnorm (bool): Whether to use Advantage normalization trick. gamma (float): Discount factor. gae_lam (float): gae lambda. **kwargs: Other arguments. Examples: >>> state_space=None >>> obs_space={'agent_0': Box(-inf, inf, (18,), float32), ... 'agent_1': Box(-inf, inf, (18,), float32), ... 'agent_2': Box(-inf, inf, (18,), float32)}, >>> act_space={'agent_0': Box(0.0, 1.0, (5,), float32), ... 'agent_1': Box(0.0, 1.0, (5,), float32), ... 'agent_2': Box(0.0, 1.0, (5,), float32)}, >>> n_envs=16, >>> buffer_size=1600, >>> agent_keys=['agent_0', 'agent_1', 'agent_2'], >>> max_episode_steps = 100 >>> memory = MARL_OffPolicyBuffer(agent_keys=agent_keys, state_space=state_space, obs_space=obs_space, ... act_space=act_space, n_envs=n_envs, buffer_size=buffer_size, ... max_episode_steps=max_episode_steps, ... use_gae=False, use_advnorm=False, gamma=0.99, gae_lam=0.95) """ def __init__(self, agent_keys: List[str], state_space: Dict[str, Space] = None, obs_space: Dict[str, Dict[str, Space]] = None, act_space: Dict[str, Dict[str, Space]] = None, n_envs: int = 1, buffer_size: int = 1, max_episode_steps: int = 1, use_gae: Optional[bool] = False, use_advnorm: Optional[bool] = False, gamma: Optional[float] = None, gae_lam: Optional[float] = None, **kwargs): self.max_eps_len = max_episode_steps self.n_actions = kwargs['n_actions'] if 'n_actions' in kwargs else None self.obs_shape = {k: space2shape(obs_space[k]) for k in agent_keys} self.act_shape = {k: space2shape(act_space[k]) for k in agent_keys} super(MARL_OnPolicyBuffer_RNN, self).__init__(agent_keys, state_space, obs_space, act_space, n_envs, buffer_size, use_gae, use_advnorm, gamma, gae_lam, **kwargs) self.episode_data = {} self.clear_episodes() @property def full(self): return self.size >= self.buffer_size
[docs] def clear(self): """ Clear all buffer data in the on-policy replay buffer. This method resets all stored observations, actions, rewards, values, and other related fields to zero. Args: None Returns: None """ self.data = { 'obs': {k: np.zeros((self.buffer_size, self.max_eps_len) + self.obs_shape[k], np.float32) for k in self.agent_keys}, 'actions': {k: np.zeros((self.buffer_size, self.max_eps_len) + self.act_shape[k], np.float32) for k in self.agent_keys}, 'rewards': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'returns': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'values': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'advantages': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'log_pi_old': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'terminals': {k: np.zeros((self.buffer_size, self.max_eps_len), np.bool_) for k in self.agent_keys}, 'agent_mask': {k: np.zeros((self.buffer_size, self.max_eps_len), np.bool_) for k in self.agent_keys}, 'filled': np.zeros((self.buffer_size, self.max_eps_len), np.bool_) } if self.store_global_state: self.data.update({ 'state': np.zeros((self.buffer_size, self.max_eps_len) + self.state_space.shape, np.float32) }) if self.use_actions_mask: self.data.update({ 'avail_actions': {k: np.zeros((self.buffer_size, self.max_eps_len) + self.avail_actions_shape[k], dtype=np.bool_) for k in self.agent_keys} }) self.ptr, self.size = 0, 0
[docs] def clear_episodes(self): self.episode_data = { 'obs': {k: np.zeros((self.n_envs, self.max_eps_len) + self.obs_shape[k], np.float32) for k in self.agent_keys}, 'actions': {k: np.zeros((self.n_envs, self.max_eps_len) + self.act_shape[k], np.float32) for k in self.agent_keys}, 'rewards': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'returns': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'values': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'advantages': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'log_pi_old': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'terminals': {k: np.zeros((self.n_envs, self.max_eps_len), np.bool_) for k in self.agent_keys}, 'agent_mask': {k: np.zeros((self.n_envs, self.max_eps_len), np.bool_) for k in self.agent_keys}, 'filled': np.zeros((self.n_envs, self.max_eps_len), np.bool_) } if self.store_global_state: self.episode_data.update({ 'state': np.zeros((self.n_envs, self.max_eps_len) + self.state_space.shape, np.float32) }) if self.use_actions_mask: self.episode_data.update({ 'avail_actions': {k: np.zeros((self.n_envs, self.max_eps_len) + self.avail_actions_shape[k], dtype=np.bool_) for k in self.agent_keys} })
[docs] def store(self, **step_data): """ Stores a step of data for each environment. Parameters: step_data (dict): A dict of step data that to be stored into self.episode_data. """ envs_step = step_data['episode_steps'] envs_choice = range(self.n_envs) self.episode_data["filled"][envs_choice, envs_step] = True for data_key, data_value in step_data.items(): if data_key == "episode_steps": continue if data_key == 'state': self.episode_data[data_key][envs_choice, envs_step] = data_value continue for agt_key in self.agent_keys: self.episode_data[data_key][agt_key][envs_choice, envs_step] = data_value[agt_key]
[docs] def store_episodes(self, i_env): """ Stores the episode of data for ith environment into the self.data. Parameters: i_env (int): The ith environment. """ for data_key in self.data_keys: if data_key == "filled": self.data["filled"][self.ptr] = self.episode_data["filled"][i_env].copy() continue if data_key in ['state']: self.data[data_key][self.ptr] = self.episode_data[data_key][i_env].copy() continue for agt_key in self.agent_keys: self.data[data_key][agt_key][self.ptr] = self.episode_data[data_key][agt_key][i_env].copy() self.ptr = (self.ptr + 1) % self.buffer_size self.size = np.min([self.size + 1, self.buffer_size]) # clear the filled values for ith env. self.episode_data['filled'][i_env] = np.zeros(self.max_eps_len, dtype=np.bool_)
[docs] def finish_path(self, i_env: Optional[int] = None, i_step: Optional[int] = None, value_next: Optional[dict] = None, value_normalizer: Optional[dict] = None): """ Calculates and stores the returns and advantages when an episode is finished. Parameters: i_env (int): The index of environment. i_step (int): The index of step for current environment. value_next (Optional[dict]): The critic values of the terminal state. value_normalizer (Optional[dict]): The value normalizer method, default is None. """ env_step = i_step if i_step < self.max_eps_len else self.max_eps_len path_slice = np.arange(0, env_step).astype(np.int32) # calculate advantages and returns use_value_norm = False if (value_normalizer is None) else True use_parameter_sharing = False if use_value_norm: if value_normalizer.keys() != set(self.agent_keys): use_parameter_sharing = True for key in self.agent_keys: rewards = np.array(self.episode_data['rewards'][key][i_env, path_slice]) vs = np.append(np.array(self.episode_data['values'][key][i_env, path_slice]), [value_next[key]], axis=0) dones = np.array(self.episode_data['terminals'][key][i_env, path_slice]) returns = np.zeros_like(rewards) last_gae_lam = 0 step_nums = len(path_slice) key_vn = self.agent_keys[0] if use_parameter_sharing else key if self.use_gae: for t in reversed(range(step_nums)): if use_value_norm: vs_t = value_normalizer[key_vn].denormalize(vs[t]).item() vs_next = value_normalizer[key_vn].denormalize(vs[t + 1]).item() else: vs_t, vs_next = vs[t], vs[t + 1] delta = rewards[t] + (1 - dones[t]) * self.gamma * vs_next - vs_t last_gae_lam = delta + (1 - dones[t]) * self.gamma * self.gae_lambda * last_gae_lam returns[t] = last_gae_lam + vs_t advantages = returns - value_normalizer[key_vn].denormalize( vs[:-1]) if use_value_norm else returns - vs[:-1] else: returns_ = np.append(returns, [value_next[key]], axis=0) for t in reversed(range(step_nums)): returns_[t] = rewards[t] + (1 - dones[t]) * self.gamma * returns_[t + 1] advantages = returns_ - value_normalizer[key_vn].denormalize(vs) if use_value_norm else returns_ - vs advantages = advantages[:-1] returns = returns_[:-1] self.episode_data['returns'][key][i_env, path_slice] = returns self.episode_data['advantages'][key][i_env, path_slice] = advantages self.store_episodes(i_env)
[docs] def sample(self, indexes: Optional[np.ndarray] = None): """ Samples a batch of data from the replay buffer. Parameters: indexes (int): The indexes of the data in the buffer that will be sampled. Returns: samples_dict (dict): The sampled data. """ assert self.full, "Not enough transitions for on-policy buffer to random sample" episode_choices = indexes samples_dict = {} for data_key in self.data_keys: if data_key == "filled": samples_dict["filled"] = self.data['filled'][episode_choices] continue if data_key in ['state', 'state_next']: samples_dict[data_key] = self.data[data_key][episode_choices] continue samples_dict[data_key] = {k: self.data[data_key][k][episode_choices] for k in self.agent_keys} samples_dict['batch_size'] = len(indexes) samples_dict['sequence_length'] = self.max_eps_len return samples_dict
[docs] class IC3Net_OnPolicyBuffer_RNN(MARL_OnPolicyBuffer_RNN): def __init__(self, agent_keys: List[str], state_space: Dict[str, Space] = None, obs_space: Dict[str, Dict[str, Space]] = None, act_space: Dict[str, Dict[str, Space]] = None, n_envs: int = 1, buffer_size: int = 1, max_episode_steps: int = 1, use_gae: Optional[bool] = False, use_advnorm: Optional[bool] = False, gamma: Optional[float] = None, gae_lam: Optional[float] = None, **kwargs): super(IC3Net_OnPolicyBuffer_RNN, self).__init__(agent_keys, state_space, obs_space, act_space, n_envs, buffer_size, max_episode_steps, use_gae, use_advnorm, gamma, gae_lam, **kwargs)
[docs] def clear(self): """ Clear all buffer data in the on-policy replay buffer. This method resets all stored observations, actions, rewards, values, and other related fields to zero. Args: None Returns: None """ self.data = { 'obs': {k: np.zeros((self.buffer_size, self.max_eps_len) + self.obs_shape[k], np.float32) for k in self.agent_keys}, 'actions': {k: np.zeros((self.buffer_size, self.max_eps_len) + self.act_shape[k], np.float32) for k in self.agent_keys}, 'rewards': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'returns': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'values': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'advantages': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'gate_log_pi_old': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'log_pi_old': {k: np.zeros((self.buffer_size, self.max_eps_len), np.float32) for k in self.agent_keys}, 'terminals': {k: np.zeros((self.buffer_size, self.max_eps_len), np.bool_) for k in self.agent_keys}, 'agent_mask': {k: np.zeros((self.buffer_size, self.max_eps_len), np.bool_) for k in self.agent_keys}, 'filled': np.zeros((self.buffer_size, self.max_eps_len), np.bool_) } if self.store_global_state: self.data.update({ 'state': np.zeros((self.buffer_size, self.max_eps_len) + self.state_space.shape, np.float32) }) if self.use_actions_mask: self.data.update({ 'avail_actions': {k: np.zeros((self.buffer_size, self.max_eps_len) + self.avail_actions_shape[k], dtype=np.bool_) for k in self.agent_keys} }) self.ptr, self.size = 0, 0
[docs] def clear_episodes(self): self.episode_data = { 'obs': {k: np.zeros((self.n_envs, self.max_eps_len) + self.obs_shape[k], np.float32) for k in self.agent_keys}, 'actions': {k: np.zeros((self.n_envs, self.max_eps_len) + self.act_shape[k], np.float32) for k in self.agent_keys}, 'rewards': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'returns': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'values': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'advantages': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'gate_log_pi_old': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'log_pi_old': {k: np.zeros((self.n_envs, self.max_eps_len), np.float32) for k in self.agent_keys}, 'terminals': {k: np.zeros((self.n_envs, self.max_eps_len), np.bool_) for k in self.agent_keys}, 'agent_mask': {k: np.zeros((self.n_envs, self.max_eps_len), np.bool_) for k in self.agent_keys}, 'filled': np.zeros((self.n_envs, self.max_eps_len), np.bool_) } if self.store_global_state: self.episode_data.update({ 'state': np.zeros((self.n_envs, self.max_eps_len) + self.state_space.shape, np.float32) }) if self.use_actions_mask: self.episode_data.update({ 'avail_actions': {k: np.zeros((self.n_envs, self.max_eps_len) + self.avail_actions_shape[k], dtype=np.bool_) for k in self.agent_keys} })
[docs] class MeanField_OnPolicyBuffer(MARL_OnPolicyBuffer): """ Replay buffer for Mean Field Actor-Critic algorithm. """ def __init__(self, *args, **kwargs): self.n_actions_max = kwargs.get('n_actions_max') self.prob_space = (self.n_actions_max,) super(MeanField_OnPolicyBuffer, self).__init__(*args, **kwargs)
[docs] def clear(self): super().clear() self.data["actions_mean"] = {k: create_memory(space2shape(self.prob_space), self.n_envs, self.n_size) for k in self.agent_keys}
[docs] class MeanField_OnPolicyBuffer_RNN(MARL_OnPolicyBuffer_RNN): def __init__(self, *args, **kwargs): self.n_actions_max = kwargs.get('n_actions_max') self.prob_space = (self.n_actions_max,) super(MeanField_OnPolicyBuffer_RNN, self).__init__(*args, **kwargs)
[docs] def clear(self): super().clear() self.data['actions_mean'] = {k: np.zeros((self.buffer_size, self.max_eps_len) + self.prob_space, dtype=np.float32) for k in self.agent_keys}
[docs] def clear_episodes(self): super().clear_episodes() self.episode_data['actions_mean'] = {k: np.zeros((self.n_envs, self.max_eps_len) + self.prob_space, dtype=np.float32) for k in self.agent_keys}
[docs] class MARL_OffPolicyBuffer(BaseBuffer): """ Replay buffer for off-policy MARL algorithms. Args: agent_keys (List[str]): Keys that identify each agent. state_space (Dict[str, Space]): Global state space, type: Discrete, Box. obs_space (Dict[str, Dict[str, Space]]): Observation space for one agent (suppose same obs space for group agents). act_space (Dict[str, Dict[str, Space]]): Action space for one agent (suppose same actions space for group agents). n_envs (int): Number of parallel environments. buffer_size (int): Buffer size of total experience data. batch_size (int): Batch size of transition data for a sample. **kwargs: Other arguments. Examples: >>> state_space=None >>> obs_space={'agent_0': Box(-inf, inf, (18,), float32), ... 'agent_1': Box(-inf, inf, (18,), float32), ... 'agent_2': Box(-inf, inf, (18,), float32)}, >>> act_space={'agent_0': Box(0.0, 1.0, (5,), float32), ... 'agent_1': Box(0.0, 1.0, (5,), float32), ... 'agent_2': Box(0.0, 1.0, (5,), float32)}, >>> n_envs=50, >>> buffer_size=10000, >>> batch_size=256, >>> agent_keys=['agent_0', 'agent_1', 'agent_2'], >>> memory = MARL_OffPolicyBuffer(agent_keys=agent_keys, state_space=state_space, obs_space=obs_space, ... act_space=act_space, n_envs=n_envs, buffer_size=buffer_size, ... batch_size=batch_size) """ def __init__(self, agent_keys: List[str], state_space: Dict[str, Space] = None, obs_space: Dict[str, Dict[str, Space]] = None, act_space: Dict[str, Dict[str, Space]] = None, n_envs: int = 1, buffer_size: int = 1, batch_size: int = 1, **kwargs): super(MARL_OffPolicyBuffer, self).__init__(agent_keys, state_space, obs_space, act_space, n_envs, buffer_size) self.batch_size = batch_size self.store_global_state = False if self.state_space is None else True self.use_actions_mask = kwargs['use_actions_mask'] if 'use_actions_mask' in kwargs else False self.avail_actions_shape = kwargs['avail_actions_shape'] if 'avail_actions_shape' in kwargs else None self.data = {} self.clear() self.data_keys = self.data.keys()
[docs] def clear(self): """ Clears the memory data in the replay buffer. Examples: An example shows the data shape: .. code-block:: python # (n_env=50, buffer_size=10000, agent_keys=['agent_0', 'agent_1', 'agent_2']) self.data = { 'obs': { 'agent_0': shape=[50, 200, 18], 'agent_1': shape=[50, 200, 18], 'agent_2': shape=[50, 200, 18], }, # dim_obs: 18 'actions': { 'agent_0': shape=[50, 200, 5], 'agent_1': shape=[50, 200, 5], 'agent_2': shape=[50, 200, 5], }, # dim_act: 5 ... } """ reward_space = {key: () for key in self.agent_keys} terminal_space = {key: () for key in self.agent_keys} agent_mask_space = {key: () for key in self.agent_keys} self.data = { 'obs': create_memory(space2shape(self.observation_space), self.n_envs, self.n_size), 'actions': create_memory(space2shape(self.action_space), self.n_envs, self.n_size), 'obs_next': create_memory(space2shape(self.observation_space), self.n_envs, self.n_size), 'rewards': create_memory(reward_space, self.n_envs, self.n_size), 'terminals': create_memory(terminal_space, self.n_envs, self.n_size, np.bool_), 'agent_mask': create_memory(agent_mask_space, self.n_envs, self.n_size, np.bool_), } if self.store_global_state: self.data.update({ 'state': create_memory(space2shape(self.state_space), self.n_envs, self.n_size), 'state_next': create_memory(space2shape(self.state_space), self.n_envs, self.n_size) }) if self.use_actions_mask: self.data.update({ "avail_actions": create_memory(self.avail_actions_shape, self.n_envs, self.n_size, np.bool_), "avail_actions_next": create_memory(self.avail_actions_shape, self.n_envs, self.n_size, np.bool_) }) self.ptr, self.size = 0, 0
[docs] def store(self, **step_data): """ Stores a step of data into the replay buffer. """ for data_key, data_values in step_data.items(): if data_key in ['state', 'state_next']: self.data[data_key][:, self.ptr] = data_values continue for agt_key in self.agent_keys: self.data[data_key][agt_key][:, self.ptr] = data_values[agt_key] self.ptr = (self.ptr + 1) % self.n_size self.size = np.min([self.size + 1, self.n_size])
[docs] def sample(self, batch_size=None): """ Samples a batch of data from the replay buffer. Parameters: batch_size (int): The size of the batch data to be sampled. Returns: samples_dict (dict): The sampled data. """ assert self.size > 0, "Not enough transitions for off-policy buffer to random sample." if batch_size is None: batch_size = self.batch_size env_choices = np.random.choice(self.n_envs, batch_size) step_choices = np.random.choice(self.size, batch_size) samples_dict = {} for data_key in self.data_keys: if data_key in ['state', 'state_next']: samples_dict[data_key] = self.data[data_key][env_choices, step_choices] continue samples_dict[data_key] = {k: self.data[data_key][k][env_choices, step_choices] for k in self.agent_keys} samples_dict['batch_size'] = batch_size return samples_dict
[docs] def finish_path(self, *args, **kwargs): return
[docs] class MARL_OffPolicyBuffer_RNN(MARL_OffPolicyBuffer): """ Replay buffer for off-policy MARL algorithms with DRQN trick. Args: agent_keys (List[str]): Keys that identify each agent. state_space (Dict[str, Space]): Global state space, type: Discrete, Box. obs_space (Dict[str, Dict[str, Space]]): Observation space for one agent (suppose same obs space for group agents). act_space (Dict[str, Dict[str, Space]]): Action space for one agent (suppose same actions space for group agents). n_envs (int): Number of parallel environments. buffer_size (int): Buffer size of total experience data. batch_size (int): Batch size of episodes for a sample. max_episode_steps (int): The sequence length of each episode data. **kwargs: Other arguments. Examples: >>> state_space=None >>> obs_space={'agent_0': Box(-inf, inf, (18,), float32), ... 'agent_1': Box(-inf, inf, (18,), float32), ... 'agent_2': Box(-inf, inf, (18,), float32)}, >>> act_space={'agent_0': Box(0.0, 1.0, (5,), float32), ... 'agent_1': Box(0.0, 1.0, (5,), float32), ... 'agent_2': Box(0.0, 1.0, (5,), float32)}, >>> n_envs=50, >>> buffer_size=10000, >>> batch_size=256, >>> agent_keys=['agent_0', 'agent_1', 'agent_2'], >>> max_episode_steps=60 >>> memory = MARL_OffPolicyBuffer_RNN(agent_keys=agent_keys, state_space=state_space, ... obs_space=obs_space, act_space=act_space, ... n_envs=n_envs, buffer_size=buffer_size, batch_size=batch_size, ... max_episode_steps=max_episode_steps) """ def __init__(self, agent_keys: List[str], state_space: Dict[str, Space] = None, obs_space: Dict[str, Dict[str, Space]] = None, act_space: Dict[str, Dict[str, Space]] = None, n_envs: int = 1, buffer_size: int = 1, batch_size: int = 1, max_episode_steps: int = 1, **kwargs): self.max_eps_len = max_episode_steps self.obs_shape = {k: space2shape(obs_space[k]) for k in agent_keys} self.act_shape = {k: space2shape(act_space[k]) for k in agent_keys} super(MARL_OffPolicyBuffer_RNN, self).__init__(agent_keys, state_space, obs_space, act_space, n_envs, buffer_size, batch_size, **kwargs) self.episode_data = {} self.clear_episodes() self.episode_data_keys = self.episode_data.keys()
[docs] def clear(self): """ Clears the memory data in the replay buffer. Examples: An example shows the data shape (``buffer_size=10000``, ``max_eps_len=60``, ``agent_keys=['agent_0', 'agent_1', 'agent_2']``): .. code-block:: python self.data = { 'obs': { 'agent_0': shape=[10000, 61, 18], 'agent_1': shape=[10000, 61, 18], 'agent_2': shape=[10000, 61, 18], }, # dim_obs: 18 'actions': { 'agent_0': shape=[10000, 60, 5], 'agent_1': shape=[10000, 60, 5], 'agent_2': shape=[10000, 60, 5], }, # dim_act: 5 ... 'filled': shape=[10000, 60], # Step mask values. True means current step is not terminated. } """ self.data = { 'obs': {k: np.zeros((self.buffer_size, self.max_eps_len + 1) + self.obs_shape[k], dtype=np.float32) for k in self.agent_keys}, 'actions': {k: np.zeros((self.buffer_size, self.max_eps_len) + self.act_shape[k], dtype=np.float32) for k in self.agent_keys}, 'rewards': {k: np.zeros((self.buffer_size, self.max_eps_len), dtype=np.float32) for k in self.agent_keys}, 'terminals': {k: np.zeros((self.buffer_size, self.max_eps_len), dtype=np.bool_) for k in self.agent_keys}, 'agent_mask': {k: np.zeros((self.buffer_size, self.max_eps_len), dtype=np.bool_) for k in self.agent_keys}, 'filled': np.zeros((self.buffer_size, self.max_eps_len), dtype=np.bool_), } if self.store_global_state: state_shape = (self.buffer_size, self.max_eps_len + 1) + space2shape(self.state_space) self.data.update({'state': np.zeros(state_shape, dtype=np.float32)}) if self.use_actions_mask: self.data.update({ 'avail_actions': {k: np.zeros((self.buffer_size, self.max_eps_len + 1) + self.avail_actions_shape[k], dtype=np.bool_) for k in self.agent_keys}}) self.ptr, self.size = 0, 0
[docs] def clear_episodes(self): """ Clears an episode of data for multiple environments in the replay buffer. Examples: An example shows the data shape (``n_envs=16``, ``max_eps_len=60``, ``agent_keys=['agent_0', 'agent_1', 'agent_2']``): .. code-block:: python self.data = { 'obs': { 'agent_0': shape=[16, 61, 18], 'agent_1': shape=[16, 61, 18], 'agent_2': shape=[16, 61, 18], }, # dim_obs: 18 'actions': { 'agent_0': shape=[16, 60, 5], 'agent_1': shape=[16, 60, 5], 'agent_2': shape=[16, 60, 5], }, # dim_act: 5 ... 'filled': shape=[16, 60], # Step mask values. True means current step is not terminated. } """ self.episode_data = { 'obs': {k: np.zeros((self.n_envs, self.max_eps_len + 1) + self.obs_shape[k], dtype=np.float32) for k in self.agent_keys}, 'actions': {k: np.zeros((self.n_envs, self.max_eps_len) + self.act_shape[k], dtype=np.float32) for k in self.agent_keys}, 'rewards': {k: np.zeros((self.n_envs, self.max_eps_len), dtype=np.float32) for k in self.agent_keys}, 'terminals': {k: np.zeros((self.n_envs, self.max_eps_len), dtype=np.bool_) for k in self.agent_keys}, 'agent_mask': {k: np.zeros((self.n_envs, self.max_eps_len), dtype=np.bool_) for k in self.agent_keys}, 'filled': np.zeros((self.n_envs, self.max_eps_len), dtype=np.bool_), } if self.store_global_state: state_shape = (self.n_envs, self.max_eps_len + 1) + space2shape(self.state_space) self.episode_data.update({'state': np.zeros(state_shape, dtype=np.float32)}) if self.use_actions_mask: self.episode_data.update({ "avail_actions": {k: np.zeros((self.n_envs, self.max_eps_len + 1) + self.avail_actions_shape[k], dtype=np.bool_) for k in self.agent_keys}})
[docs] def store(self, **step_data): """ Stores a step of data for each environment. Parameters: step_data (dict): A dict of step data that to be stored into self.episode_data. """ envs_step = step_data['episode_steps'] envs_choice = range(self.n_envs) self.episode_data["filled"][envs_choice, envs_step] = True for data_key, data_value in step_data.items(): if data_key not in self.episode_data: continue if data_key in ['state', 'state_next']: self.episode_data[data_key][envs_choice, envs_step] = data_value continue for agt_key in self.agent_keys: self.episode_data[data_key][agt_key][envs_choice, envs_step] = data_value[agt_key]
[docs] def store_episodes(self, i_env): """ Stores the episode of data for ith environment into the self.data. Parameters: i_env (int): The ith environment. """ for data_key in self.data_keys: if data_key == "filled": self.data["filled"][self.ptr] = self.episode_data["filled"][i_env].copy() continue if data_key in ['state', 'state_next']: self.data[data_key][self.ptr] = self.episode_data[data_key][i_env].copy() continue for agt_key in self.agent_keys: self.data[data_key][agt_key][self.ptr] = self.episode_data[data_key][agt_key][i_env].copy() self.ptr = (self.ptr + 1) % self.buffer_size self.size = np.min([self.size + 1, self.buffer_size]) # clear the filled values for ith env. self.episode_data['filled'][i_env] = np.zeros(self.max_eps_len, dtype=np.bool_)
[docs] def finish_path(self, i_env, **terminal_data): """ Address the terminal states, including store the terminal observations, avail_actions, and others. Parameters: i_env (int): The i-th environment. terminal_data (dict): The terminal states. """ env_step = terminal_data['episode_step'] # Store terminal data into self.episode_data. if self.store_global_state: self.episode_data['state'][i_env, env_step] = terminal_data['state'] for agt_key in self.agent_keys: self.episode_data['obs'][agt_key][i_env, env_step] = terminal_data['obs'][agt_key] if self.use_actions_mask: self.episode_data['avail_actions'][agt_key][i_env, env_step] = terminal_data['avail_actions'][agt_key] # Store the episode data of ith env into self.data. self.store_episodes(i_env)
[docs] def sample(self, batch_size=None): """ Samples a batch of data for model training. Parameters: batch_size (int): The size of the data batch, default is self.batch_size (recommended). Returns: samples_dict (dict): A dict of sampled data. """ assert self.size > 0, "You need to first store experience data into the buffer!" if batch_size is None: batch_size = self.batch_size episode_choices = np.random.choice(self.size, batch_size) samples_dict = {} for data_key in self.data_keys: if data_key == "filled": samples_dict["filled"] = self.data['filled'][episode_choices] continue if data_key in ['state', 'state_next']: samples_dict[data_key] = self.data[data_key][episode_choices] continue samples_dict[data_key] = {k: self.data[data_key][k][episode_choices] for k in self.agent_keys} samples_dict['batch_size'] = batch_size samples_dict['sequence_length'] = self.max_eps_len return samples_dict
[docs] class MeanField_OffPolicyBuffer(MARL_OffPolicyBuffer): """ Replay buffer for off-policy Mean-Field MARL algorithms (Mean-Field Q-Learning). Args: agent_keys (List[str]): Keys that identify each agent. state_space (Dict[str, Space]): Global state space, type: Discrete, Box. obs_space (Dict[str, Dict[str, Space]]): Observation space for one agent (suppose same obs space for group agents). act_space (Dict[str, Dict[str, Space]]): Action space for one agent (suppose same actions space for group agents). n_envs (int): Number of parallel environments. buffer_size (int): Buffer size of total experience data. batch_size (int): Batch size of transition data for a sample. **kwargs: Other arguments. Examples: >>> state_space=None >>> obs_space={'agent_0': Box(-inf, inf, (18,), float32), ... 'agent_1': Box(-inf, inf, (18,), float32), ... 'agent_2': Box(-inf, inf, (18,), float32)}, >>> act_space={'agent_0': Box(0.0, 1.0, (5,), float32), ... 'agent_1': Box(0.0, 1.0, (5,), float32), ... 'agent_2': Box(0.0, 1.0, (5,), float32)}, >>> n_envs=50, >>> buffer_size=10000, >>> batch_size=256, >>> agent_keys=['agent_0', 'agent_1', 'agent_2'], >>> memory = MARL_OffPolicyBuffer(agent_keys=agent_keys, state_space=state_space, obs_space=obs_space, ... act_space=act_space, n_envs=n_envs, buffer_size=buffer_size, ... batch_size=batch_size) """ def __init__(self, agent_keys: List[str], state_space: Dict[str, Space] = None, obs_space: Dict[str, Dict[str, Space]] = None, act_space: Dict[str, Dict[str, Space]] = None, n_envs: int = 1, buffer_size: int = 1, batch_size: int = 1, **kwargs): self.n_actions_max = kwargs.get('n_actions_max') self.prob_space = (self.n_actions_max,) super(MeanField_OffPolicyBuffer, self).__init__(agent_keys, state_space, obs_space, act_space, n_envs, buffer_size, batch_size, **kwargs)
[docs] def clear(self): super().clear() self.data["actions_mean"] = {k: create_memory(space2shape(self.prob_space), self.n_envs, self.n_size) for k in self.agent_keys} self.data["actions_mean_next"] = {k: create_memory(space2shape(self.prob_space), self.n_envs, self.n_size) for k in self.agent_keys}
[docs] class MeanField_OffPolicyBuffer_RNN(MARL_OffPolicyBuffer_RNN): def __init__(self, *args, **kwargs) -> None: self.n_actions_max = kwargs.get('n_actions_max') self.prob_space = (self.n_actions_max,) super(MeanField_OffPolicyBuffer_RNN, self).__init__(*args, **kwargs)
[docs] def clear(self): """ Clear all buffer data in the on-policy replay buffer. This method resets all stored observations, actions, rewards, values, and other related fields to zero. Args: None Returns: None """ super().clear() self.data['actions_mean'] = {k: np.zeros((self.buffer_size, self.max_eps_len + 1) + self.prob_space, dtype=np.float32) for k in self.agent_keys}
[docs] def clear_episodes(self): super().clear_episodes() self.episode_data['actions_mean'] = {k: np.zeros((self.n_envs, self.max_eps_len + 1) + self.prob_space, dtype=np.float32) for k in self.agent_keys}
[docs] def finish_path(self, i_env, **terminal_data): """ Address the terminal states, including store the terminal observations, avail_actions, and others. Parameters: i_env (int): The i-th environment. terminal_data (dict): The terminal states. """ env_step = terminal_data['episode_step'] # Store terminal data into self.episode_data. if self.store_global_state: self.episode_data['state'][i_env, env_step] = terminal_data['state'] for agt_key in self.agent_keys: self.episode_data['obs'][agt_key][i_env, env_step] = terminal_data['obs'][agt_key] self.episode_data['actions_mean'][agt_key][i_env, env_step] = terminal_data['actions_mean'][agt_key] if self.use_actions_mask: self.episode_data['avail_actions'][agt_key][i_env, env_step] = terminal_data['avail_actions'][agt_key] # Store the episode data of ith env into self.data. self.store_episodes(i_env)