Source code for xuance.environment.multi_agent_env.drones

"""
gym-pybullet-drones
GitHub: https://github.com/utiasDSL/gym-pybullet-drones.git
Note: The version of Python should be >= 3.10.
"""
import numpy as np
from gymnasium.spaces import Box
import time
from operator import itemgetter
from xuance.environment import RawMultiAgentEnv
try:
    from gym_pybullet_drones.utils.enums import DroneModel, Physics, ActionType, ObservationType
    from gym_pybullet_drones.envs.MultiHoverAviary import MultiHoverAviary as MultiHoverAviary_Official
except ImportError:
    DroneModel, Physics, ActionType, ObservationType = None, None, None, None
    MultiHoverAviary_Official = object


[docs] class MultiHoverAviary(MultiHoverAviary_Official): """Multi-agent RL problem: leader-follower.""" ################################################################################ def __init__(self, drone_model: DroneModel = DroneModel.CF2X if hasattr(DroneModel, 'CF2X') else None, num_drones: int = 2, neighbourhood_radius: float = np.inf, initial_xyzs=None, initial_rpys=None, physics: Physics = Physics.PYB if hasattr(Physics, 'PYB') else None, pyb_freq: int = 240, ctrl_freq: int = 30, gui=False, record=False, obs: ObservationType = ObservationType.KIN if hasattr(ObservationType, 'KIN') else None, act: ActionType = ActionType.RPM if hasattr(ActionType, 'RPM') else None, ): """Initialization of a multi-agent RL environment. Using the generic multi-agent RL superclass. Parameters ---------- drone_model : DroneModel, optional The desired drone type (detailed in an .urdf file in folder `assets`). num_drones : int, optional The desired number of drones in the aviary. neighbourhood_radius : float, optional Radius used to compute the drones' adjacency matrix, in meters. initial_xyzs: ndarray | None, optional (NUM_DRONES, 3)-shaped array containing the initial XYZ position of the drones. initial_rpys: ndarray | None, optional (NUM_DRONES, 3)-shaped array containing the initial orientations of the drones (in radians). physics : Physics, optional The desired implementation of PyBullet physics/custom dynamics. pyb_freq : int, optional The frequency at which PyBullet steps (a multiple of ctrl_freq). ctrl_freq : int, optional The frequency at which the environment steps. gui : bool, optional Whether to use PyBullet's GUI. record : bool, optional Whether to save a video of the simulation. obs : ObservationType, optional The type of observation space (kinematic information or vision) act : ActionType, optional The type of action space (1 or 3D; RPMS, thurst and torques, or waypoint with PID control) """ self.EPISODE_LEN_SEC = 8 super().__init__(drone_model=drone_model, num_drones=num_drones, neighbourhood_radius=neighbourhood_radius, initial_xyzs=initial_xyzs, initial_rpys=initial_rpys, physics=physics, pyb_freq=pyb_freq, ctrl_freq=ctrl_freq, gui=gui, record=record, obs=obs, act=act ) self.TARGET_POS = np.array([[0, 0, 1], [0, 1, 1], [1, 0, 1], [0, 0, 2], [0, 1, 2], [1, 0, 2], [2, 0, 1], [0, 2, 1], [2, 0, 2], [0, 2, 2], ]) self.NUM_TARGETS = self.NUM_DRONES self.space_range_x = [-10.0, 10.0] self.space_range_y = [-10.0, 10.0] self.space_range_z = [0.02, 10.0] self.pose_limit = np.pi - 0.2 ################################################################################ def _computeReward(self): """Computes the current reward value. Returns ------- float The reward. """ states = np.array([self._getDroneStateVector(i) for i in range(self.NUM_DRONES)]) target_pos = self.TARGET_POS[:self.NUM_TARGETS].reshape(self.NUM_TARGETS, 1, 3) current_pos = states[:, :3].reshape(1, self.NUM_DRONES, 3) relative_pos = target_pos - current_pos distance_matrix = np.linalg.norm(relative_pos, axis=-1) reward_team = -distance_matrix.min(axis=-1, keepdims=True).sum() rewards = np.ones([self.NUM_DRONES, 1]) * reward_team for i in range(self.NUM_DRONES): x, y, z = states[i][0], states[i][1], states[i][2] if (max(abs(states[i][7]), abs(states[i][8])) > self.pose_limit) and ( z < self.space_range_z[0] + 0.05): # the drone fulls down rewards[i] -= 10 for j in range(self.NUM_DRONES): # penalize collision with each other if i == j: continue distance_ij = np.linalg.norm(states[i, :3] - states[j, :3]) if distance_ij < 0.1: rewards[i] -= 10 return rewards ################################################################################ def _computeTerminated(self): """Computes the current done value. Returns ------- bool Whether the current episode is done. """ states = np.array([self._getDroneStateVector(i) for i in range(self.NUM_DRONES)]) for i in range(self.NUM_DRONES): x, y, z = states[i][0], states[i][1], states[i][2] if (max(abs(states[i][7]), abs(states[i][8])) > self.pose_limit) and (z < self.space_range_z[0] + 0.05): # The drone is too tilted return True return False ################################################################################ def _computeTruncated(self): """Computes the current truncated value. Returns ------- bool Whether the current episode timed out. """ states = np.array([self._getDroneStateVector(i) for i in range(self.NUM_DRONES)]) for i in range(self.NUM_DRONES): x, y, z = states[i][0], states[i][1], states[i][2] if (x < self.space_range_x[0]) or (x > self.space_range_x[1]) or (y < self.space_range_y[0]) or ( y > self.space_range_y[1]) or (z < self.space_range_z[0]) or ( z > self.space_range_z[1]): # out of range return True if self.step_counter / self.PYB_FREQ > self.EPISODE_LEN_SEC: return True else: return False
REGISTRY = { "MultiHoverAviary": MultiHoverAviary, # you can add your custom scenarios here. }
[docs] class Drones_MultiAgentEnv(RawMultiAgentEnv): def __init__(self, config): super(Drones_MultiAgentEnv, self).__init__() # import scenarios of gym-pybullet-drones self.env_id = config.env_id self.gui = config.render # Note: You cannot render multiple environments in parallel. self.sleep = config.sleep self.env_id = config.env_id kwargs_env = {'gui': self.gui} if self.env_id in ["MultiHoverAviary"]: kwargs_env.update({'num_drones': config.num_drones, 'obs': ObservationType(config.obs_type), 'act': ActionType(config.act_type)}) self.env = REGISTRY[config.env_id](**kwargs_env) self.env.reset(seed=config.env_seed) self.num_agents = config.num_drones self.agents = [f"agent_{i}" for i in range(self.num_agents)] self.state_space = Box(-np.inf, np.inf, shape=[20, ]) obs_shape_i = (self.env.observation_space.shape[-1],) act_shape_i = (self.env.action_space.shape[-1],) self.observation_space = {k: Box(-np.inf, np.inf, obs_shape_i) for k in self.agents} self.action_space = {k: Box(-np.inf, np.inf, act_shape_i, seed=config.env_seed) for k in self.agents} self.max_episode_steps = self.max_cycles = config.max_episode_steps self._episode_step = 0
[docs] def space_reshape(self, gym_space): low = gym_space.low.reshape(-1) high = gym_space.high.reshape(-1) shape_obs = (gym_space.shape[-1],) return Box(low=low, high=high, shape=shape_obs, dtype=gym_space.dtype)
[docs] def close(self): self.env.close()
[docs] def render(self, *args, **kwargs): return np.zeros([2, 2, 2])
[docs] def reset(self): obs, info = self.env.reset() info["episode_step"] = self._episode_step self._episode_step = 0 obs_dict = {k: obs[i] for i, k in enumerate(self.agents)} return obs_dict, info
[docs] def step(self, actions): actions_array = np.array(itemgetter(*self.agents)(actions)) obs, reward, terminated, truncated, info = self.env.step(actions_array) obs_dict = {k: obs[i] for i, k in enumerate(self.agents)} terminated_dict = {k: terminated for i, k in enumerate(self.agents)} rewrds_dict = {k: reward[i, 0] for i, k in enumerate(self.agents)} self._episode_step += 1 truncated = True if (self._episode_step >= self.max_episode_steps) else False info["episode_step"] = self._episode_step # current episode step if self.gui: time.sleep(self.sleep) return obs_dict, rewrds_dict, terminated_dict, truncated, info
[docs] def agent_mask(self): return {agent: True for agent in self.agents} # 1 means available
[docs] def state(self): return self.state_space.sample()
[docs] def avail_actions(self): return