Source code for xuance.environment.single_agent_env.gym

import gymnasium as gym
import numpy as np
from collections import deque
try:
    import cv2
except ImportError:
    print("The module opencv-python might not be installed. "
          "Please ensure you have installed opencv-python via `pip install opencv-python==4.5.4.58`.")


[docs] class Gym_Env(gym.Wrapper): """ Args: env_id (str): The environment id of Atari, such as "Breakout-v5", "Pong-v5", etc. env_seed (int): The random seed to set the environment. render_mode (str): "rgb_array", "human" """ def __init__(self, config, **kwargs): if config.env_id == "CarRacing-v3": kwargs['continuous'] = False self.env = gym.make(config.env_id, render_mode=config.render_mode, **kwargs) self.env.action_space.seed(seed=config.env_seed) self.env.reset(seed=config.env_seed) super(Gym_Env, self).__init__(self.env) # self.env.seed(config.env_seed) self.observation_space = self.env.observation_space self.action_space = self.env.action_space self.metadata = self.env.metadata self.max_episode_steps = self.env._max_episode_steps
[docs] def render(self, *args): return self.env.render()
[docs] def reset(self): obs, info = self.env.reset() self._episode_step = 0 self._episode_score = 0.0 info["episode_step"] = self._episode_step return obs, info
[docs] def step(self, actions): observation, reward, terminated, truncated, info = self.env.step(actions) self._episode_step += 1 self._episode_score += reward info["episode_step"] = self._episode_step info["episode_score"] = self._episode_score return observation, reward, terminated, truncated, info
[docs] class MountainCar(Gym_Env): def __init__(self, env_id: str, env_seed: int, render_mode: str): super(MountainCar, self).__init__(env_id, env_seed, render_mode) self.num_stack = 4 self.frames = deque([], maxlen=self.num_stack) self.observation_space = gym.spaces.Box(low=np.array([-1.2, -0.07, -1.2, -0.07, -1.2, -0.07, -1.2, -0.07]), high=np.array([0.6, 0.07, 0.6, 0.07, 0.6, 0.07, 0.6, 0.07]), shape=(8,), dtype=np.float32) self.pre_position = 0.0
[docs] def reset(self): obs, info = self.env.reset() self._episode_step = 0 self._episode_score = 0.0 info["episode_step"] = self._episode_step for i in range(self.num_stack): self.frames.append(obs) self.pre_position = obs[0] return LazyFrames(list(self.frames)), info
[docs] def step(self, actions): observation, reward, terminated, truncated, info = self.env.step(actions) self._episode_step += 1 self._episode_score += reward info["episode_step"] = self._episode_step info["episode_score"] = self._episode_score # reward += 10 * observation[0] # reward + 10 * (observation[0] - self.pre_position) # reward += observation[1] ** 2 self.frames.append(observation) self.pre_position = observation[0] return LazyFrames(list(self.frames)), reward, terminated, truncated, info
[docs] class LazyFrames(object): """ This object ensures that common frames between the observations are only stored once. It exists purely to optimize memory usage which can be huge for DQN's 1M frames replay buffers. This object should only be converted to numpy array before being passed to the model. """ def __init__(self, frames): self._frames = frames self._out = None def _force(self): if self._out is None: self._out = np.concatenate(self._frames, axis=-1) self._frames = None return self._out def __array__(self, dtype=None): out = self._force() if dtype is not None: out = out.astype(dtype) return out def __len__(self): return len(self._force()) def __getitem__(self, i): return self._force()[..., i]