Source code for highway_env.envs.common.abstract

import copy
import os
from typing import List, Tuple, Optional, Callable
import gym
from gym import Wrapper
from gym.wrappers import RecordVideo
from gym.utils import seeding
import numpy as np

from highway_env import utils
from highway_env.envs.common.action import action_factory, Action, DiscreteMetaAction, ActionType
from highway_env.envs.common.observation import observation_factory, ObservationType
from highway_env.envs.common.finite_mdp import finite_mdp
from highway_env.envs.common.graphics import EnvViewer
from highway_env.vehicle.behavior import IDMVehicle, LinearVehicle
from highway_env.vehicle.controller import MDPVehicle
from highway_env.vehicle.kinematics import Vehicle

Observation = np.ndarray

[docs]class AbstractEnv(gym.Env): """ A generic environment for various tasks involving a vehicle driving on a road. The environment contains a road populated with vehicles, and a controlled ego-vehicle that can change lane and speed. The action space is fixed, but the observation space and reward function must be defined in the environment implementations. """ observation_type: ObservationType action_type: ActionType _record_video_wrapper: Optional[RecordVideo] metadata = { 'render.modes': ['human', 'rgb_array'], } PERCEPTION_DISTANCE = 5.0 * Vehicle.MAX_SPEED """The maximum distance of any vehicle present in the observation [m]""" def __init__(self, config: dict = None) -> None: # Configuration self.config = self.default_config() self.configure(config) # Seeding self.np_random = None self.seed() # Scene self.road = None self.controlled_vehicles = [] # Spaces self.action_type = None self.action_space = None self.observation_type = None self.observation_space = None self.define_spaces() # Running self.time = 0 # Simulation time self.steps = 0 # Actions performed self.done = False # Rendering self.viewer = None self._record_video_wrapper = None self.rendering_mode = 'human' self.enable_auto_render = False self.reset() @property def vehicle(self) -> Vehicle: """First (default) controlled vehicle.""" return self.controlled_vehicles[0] if self.controlled_vehicles else None @vehicle.setter def vehicle(self, vehicle: Vehicle) -> None: """Set a unique controlled vehicle.""" self.controlled_vehicles = [vehicle]
[docs] @classmethod def default_config(cls) -> dict: """ Default environment configuration. Can be overloaded in environment implementations, or by calling configure(). :return: a configuration dict """ return { "observation": { "type": "Kinematics" }, "action": { "type": "DiscreteMetaAction" }, "simulation_frequency": 15, # [Hz] "policy_frequency": 1, # [Hz] "other_vehicles_type": "highway_env.vehicle.behavior.IDMVehicle", "screen_width": 600, # [px] "screen_height": 150, # [px] "centering_position": [0.3, 0.5], "scaling": 5.5, "show_trajectories": False, "render_agent": True, "offscreen_rendering": os.environ.get("OFFSCREEN_RENDERING", "0") == "1", "manual_control": False, "real_time_rendering": False }
[docs] def seed(self, seed: int = None) -> List[int]: self.np_random, seed = seeding.np_random(seed) return [seed]
def configure(self, config: dict) -> None: if config: self.config.update(config) def update_metadata(self, video_real_time_ratio=2): frames_freq = self.config["simulation_frequency"] \ if self._record_video_wrapper else self.config["policy_frequency"] self.metadata['video.frames_per_second'] = video_real_time_ratio * frames_freq
[docs] def define_spaces(self) -> None: """ Set the types and spaces of observation and action from config. """ self.observation_type = observation_factory(self, self.config["observation"]) self.action_type = action_factory(self, self.config["action"]) self.observation_space = self.observation_type.space() self.action_space = self.action_type.space()
[docs] def _reward(self, action: Action) -> float: """ Return the reward associated with performing a given action and ending up in the current state. :param action: the last action performed :return: the reward """ raise NotImplementedError
[docs] def _is_terminal(self) -> bool: """ Check whether the current state is a terminal state :return:is the state terminal """ raise NotImplementedError
[docs] def _info(self, obs: Observation, action: Action) -> dict: """ Return a dictionary of additional information :param obs: current observation :param action: current action :return: info dict """ info = { "speed": self.vehicle.speed, "crashed": self.vehicle.crashed, "action": action, } try: info["cost"] = self._cost(action) except NotImplementedError: pass return info
[docs] def _cost(self, action: Action) -> float: """ A constraint metric, for budgeted MDP. If a constraint is defined, it must be used with an alternate reward that doesn't contain it as a penalty. :param action: the last action performed :return: the constraint signal, the alternate (constraint-free) reward """ raise NotImplementedError
[docs] def reset(self) -> Observation: """ Reset the environment to it's initial configuration :return: the observation of the reset state """ self.update_metadata() self.define_spaces() # First, to set the controlled vehicle class depending on action space self.time = self.steps = 0 self.done = False self._reset() self.define_spaces() # Second, to link the obs and actions to the vehicles once the scene is created return self.observation_type.observe()
[docs] def _reset(self) -> None: """ Reset the scene: roads and vehicles. This method must be overloaded by the environments. """ raise NotImplementedError()
[docs] def step(self, action: Action) -> Tuple[Observation, float, bool, dict]: """ Perform an action and step the environment dynamics. The action is executed by the ego-vehicle, and all other vehicles on the road performs their default behaviour for several simulation timesteps until the next decision making step. :param action: the action performed by the ego-vehicle :return: a tuple (observation, reward, terminal, info) """ if self.road is None or self.vehicle is None: raise NotImplementedError("The road and vehicle must be initialized in the environment implementation") self.steps += 1 self._simulate(action) obs = self.observation_type.observe() reward = self._reward(action) terminal = self._is_terminal() info = self._info(obs, action) return obs, reward, terminal, info
[docs] def _simulate(self, action: Optional[Action] = None) -> None: """Perform several steps of simulation with constant action.""" frames = int(self.config["simulation_frequency"] // self.config["policy_frequency"]) for frame in range(frames): # Forward action to the vehicle if action is not None \ and not self.config["manual_control"] \ and self.time % int(self.config["simulation_frequency"] // self.config["policy_frequency"]) == 0: self.action_type.act(action) self.road.act() self.road.step(1 / self.config["simulation_frequency"]) self.time += 1 # Automatically render intermediate simulation steps if a viewer has been launched # Ignored if the rendering is done offscreen if frame < frames - 1: # Last frame will be rendered through env.render() as usual self._automatic_rendering() self.enable_auto_render = False
[docs] def render(self, mode: str = 'human') -> Optional[np.ndarray]: """ Render the environment. Create a viewer if none exists, and use it to render an image. :param mode: the rendering mode """ self.rendering_mode = mode if self.viewer is None: self.viewer = EnvViewer(self) self.enable_auto_render = True self.viewer.display() if not self.viewer.offscreen: self.viewer.handle_events() if mode == 'rgb_array': image = self.viewer.get_image() return image
[docs] def close(self) -> None: """ Close the environment. Will close the environment viewer if it exists. """ self.done = True if self.viewer is not None: self.viewer.close() self.viewer = None
[docs] def get_available_actions(self) -> List[int]: """ Get the list of currently available actions. Lane changes are not available on the boundary of the road, and speed changes are not available at maximal or minimal speed. :return: the list of available actions """ if not isinstance(self.action_type, DiscreteMetaAction): raise ValueError("Only discrete meta-actions can be unavailable.") actions = [self.action_type.actions_indexes['IDLE']] for l_index in self.road.network.side_lanes(self.vehicle.lane_index): if l_index[2] < self.vehicle.lane_index[2] \ and self.road.network.get_lane(l_index).is_reachable_from(self.vehicle.position) \ and self.action_type.lateral: actions.append(self.action_type.actions_indexes['LANE_LEFT']) if l_index[2] > self.vehicle.lane_index[2] \ and self.road.network.get_lane(l_index).is_reachable_from(self.vehicle.position) \ and self.action_type.lateral: actions.append(self.action_type.actions_indexes['LANE_RIGHT']) if self.vehicle.speed_index < self.vehicle.target_speeds.size - 1 and self.action_type.longitudinal: actions.append(self.action_type.actions_indexes['FASTER']) if self.vehicle.speed_index > 0 and self.action_type.longitudinal: actions.append(self.action_type.actions_indexes['SLOWER']) return actions
def set_record_video_wrapper(self, wrapper: RecordVideo): self._record_video_wrapper = wrapper self.update_metadata()
[docs] def _automatic_rendering(self) -> None: """ Automatically render the intermediate frames while an action is still ongoing. This allows to render the whole video and not only single steps corresponding to agent decision-making. If a RecordVideo wrapper has been set, use it to capture intermediate frames. """ if self.viewer is not None and self.enable_auto_render: if self._record_video_wrapper and self._record_video_wrapper.video_recorder: self._record_video_wrapper.video_recorder.capture_frame() else: self.render(self.rendering_mode)
[docs] def simplify(self) -> 'AbstractEnv': """ Return a simplified copy of the environment where distant vehicles have been removed from the road. This is meant to lower the policy computational load while preserving the optimal actions set. :return: a simplified environment state """ state_copy = copy.deepcopy(self) state_copy.road.vehicles = [state_copy.vehicle] + state_copy.road.close_vehicles_to( state_copy.vehicle, self.PERCEPTION_DISTANCE) return state_copy
[docs] def change_vehicles(self, vehicle_class_path: str) -> 'AbstractEnv': """ Change the type of all vehicles on the road :param vehicle_class_path: The path of the class of behavior for other vehicles Example: "highway_env.vehicle.behavior.IDMVehicle" :return: a new environment with modified behavior model for other vehicles """ vehicle_class = utils.class_from_path(vehicle_class_path) env_copy = copy.deepcopy(self) vehicles = env_copy.road.vehicles for i, v in enumerate(vehicles): if v is not env_copy.vehicle: vehicles[i] = vehicle_class.create_from(v) return env_copy
def set_preferred_lane(self, preferred_lane: int = None) -> 'AbstractEnv': env_copy = copy.deepcopy(self) if preferred_lane: for v in env_copy.road.vehicles: if isinstance(v, IDMVehicle): v.route = [(lane[0], lane[1], preferred_lane) for lane in v.route] # Vehicle with lane preference are also less cautious v.LANE_CHANGE_MAX_BRAKING_IMPOSED = 1000 return env_copy def set_route_at_intersection(self, _to: str) -> 'AbstractEnv': env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: if isinstance(v, IDMVehicle): v.set_route_at_intersection(_to) return env_copy def set_vehicle_field(self, args: Tuple[str, object]) -> 'AbstractEnv': field, value = args env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: if v is not self.vehicle: setattr(v, field, value) return env_copy def call_vehicle_method(self, args: Tuple[str, Tuple[object]]) -> 'AbstractEnv': method, method_args = args env_copy = copy.deepcopy(self) for i, v in enumerate(env_copy.road.vehicles): if hasattr(v, method): env_copy.road.vehicles[i] = getattr(v, method)(*method_args) return env_copy def randomize_behavior(self) -> 'AbstractEnv': env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: if isinstance(v, IDMVehicle): v.randomize_behavior() return env_copy def to_finite_mdp(self): return finite_mdp(self, time_quantization=1/self.config["policy_frequency"]) def __deepcopy__(self, memo): """Perform a deep copy but without copying the environment viewer.""" cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): if k not in ['viewer', '_record_video_wrapper']: setattr(result, k, copy.deepcopy(v, memo)) else: setattr(result, k, None) return result
[docs]class MultiAgentWrapper(Wrapper):
[docs] def step(self, action): obs, reward, done, info = super().step(action) reward = info["agents_rewards"] done = info["agents_dones"] return obs, reward, done, info