Source code for xuance.environment.vector_envs.vector_env
from abc import ABC, abstractmethod
# referenced from openai/baselines
[docs]
class AlreadySteppingError(Exception):
def __init__(self):
msg = 'already running an async step'
Exception.__init__(self, msg)
[docs]
class NotSteppingError(Exception):
def __init__(self):
msg = 'not running an async step'
Exception.__init__(self, msg)
[docs]
class VecEnv(ABC):
def __init__(self, num_envs, observation_space, action_space):
self.num_envs = num_envs
self.observation_space = observation_space
self.action_space = action_space
self.closed = False
[docs]
@abstractmethod
def reset(self):
"""
Reset all the environments and return an array of
observations, or a dict of observation arrays.
If step_async is still doing work, that work will
be cancelled and step_wait() should not be called
until step_async() is invoked again.
"""
pass
[docs]
@abstractmethod
def step_async(self, actions):
"""
Tell all the environments to start taking a step
with the given actions.
Call step_wait() to get the results of the step.
You should not call this if a step_async run is
already pending.
"""
pass
[docs]
@abstractmethod
def step_wait(self):
"""
Wait for the step taken with step_async().
Returns (obs, rews, dones, infos):
- obs: an array of observations, or a dict of
arrays of observations.
- rews: an array of rewards
- dones: an array of "episode done" booleans
- infos: a sequence of info objects
"""
pass
[docs]
def step(self, actions):
self.step_async(actions)
return self.step_wait()
[docs]
def render(self, mode):
raise NotImplementedError
[docs]
def close(self):
if self.closed is True:
return
self.close_extras()
self.closed = True