Source code for xuance.engine.run_basic

from typing import Optional
from abc import ABC, abstractmethod
from xuance.environment import make_envs


[docs] class RunnerBase(ABC): """Abstract base class for all engine in XuanCe. RunnerBase defines the common interface and shared infrastructure for all concrete engine (e.g., DRLRunner, MARLRunner). A runner is responsible for experiment orchestration, including: - Environment lifecycle management. - Training / testing / benchmarking workflows. - Resource ownership and cleanup semantics. - Rank-aware logging in distributed settings. Notes: - Algorithm-specific logic must remain in the Agent. - Subclasses must implement the abstract methods declared here. """ def __init__(self, config, envs=None, agent=None, manage_resources=None): """Initialize the runner base. This constructor handles environment creation and resource ownership inference. By default, environments are created internally using the provided configuration. Args: config: Experiment configuration object. envs (optional): Pre-created environments. If None, environments will be created internally via `make_envs(config)`. agent (optional): Pre-created agent instance. Ownership is inferred based on whether the agent is injected externally. manage_resources (optional): Whether the runner takes ownership of resource lifecycle management (envs.close(), agent.finish()). - If None, ownership is inferred automatically: `manage_resources = (_own_envs or _own_agent)`. - If True, the runner will finalize/close any existing `agent` and `envs` held by this runner instance at the end of `run()`, even if they were injected externally. - If False, the caller is responsible for closing/finalizing injected resources. """ # Build or attach environments if envs is None: # Runner owns environments created internally self.envs = make_envs(config) self._own_envs = True else: # Environments are injected externally self.envs = envs self._own_envs = False # Agent ownership flag (actual agent creation happens in subclasses) self._own_agent = True if agent is None else False # Infer resource management responsibility self.manage_resources = ( self._own_envs or self._own_agent ) if manage_resources is None else manage_resources # Reset environments once at initialization # Note: This assumes online RL-style engine. Subclasses should be aware # of this design choice if different behavior is required. self.envs.reset() # Number of parallel environments self.n_envs = self.envs.num_envs # Default rank (may be overridden by distributed engine) self.rank = 0 self.agent = agent @abstractmethod def _run_train(self, **kwargs): """Execute the training workflow. Subclasses must implement this method to define how training is performed. """ pass @abstractmethod def _run_test(self, **kwargs): """Execute the testing/evaluation workflow. Subclasses must implement this method to define how evaluation is performed. """ pass @abstractmethod def _run_benchmark(self, **kwargs): """Execute the benchmarking workflow. Subclasses must implement this method to define how benchmarking (training + periodic evaluation) is performed. """ pass
[docs] def run(self, mode: Optional[str] = None, **kwargs): """Run the experiment. Args: mode (str): Execution mode. Must be one of {'train', 'test', 'benchmark'}. """ handlers = { "train": self._run_train, "test": self._run_test, "benchmark": self._run_benchmark } if mode is None: raise ValueError( "Missing required argument: 'mode'. " "Please specify one of: 'train', 'test', and 'benchmark'.\n" "Example: runner.run(mode='benchmark')" ) if mode not in handlers: raise ValueError( f"Invalid run mode: '{mode}'. " "Supported modes are: 'train', 'test', and 'benchmark'." ) try: return handlers[mode](**kwargs) finally: self._finalize()
def _finalize(self): """Finalize resources held by the runner. This method releases resources referenced by this runner instance, such as environments and the agent. It is typically invoked in a `finally` block to ensure cleanup even when exceptions occur. Cleanup rules: - If `manage_resources` is False, this method performs no action. - If `manage_resources` is True, the runner takes ownership of the lifecycle of any `agent` and `envs` attached to this instance and will call `agent.finish()` / `envs.close()` when available, regardless of whether they were created internally or injected externally. Notes: - Subclasses may override this method to add extra cleanup steps, but should generally call `super()._finalize()` to preserve base cleanup. - After closing envs, `self.envs` is set to None to help prevent accidental re-use. """ if getattr(self, "manage_resources", True): # Finalize agent if it exists. if hasattr(self, "agent") and getattr(self, "agent") is not None: self.agent.finish() if hasattr(self, "agents") and getattr(self, "agents") is not None: self.agents.finish() # Close environments if they exist. if hasattr(self, "envs") and self.envs is not None: self.envs.close() # Help prevent accidental re-use after closing. self.envs = None else: return
[docs] def rprint(self, info: str): """Rank-aware print utility. Only prints messages on rank 0, which is useful for distributed training to avoid duplicated logs. Args: info: Message to print. """ if self.rank == 0: print(info)